1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 | import numpy as np
def build_mapper(train_data):
""" build token mapper and tag mapper """
tag2id = {}
id2tag = {}
word2id = {}
id2word = {}
word_count = 0
tag_count = 0
with open(train_data) as fr:
for line in fr.readlines():
sp_line = line.strip().split("/")
if len(sp_line) != 2:
tag = sp_line[-1]
word = "/".join(sp_line[:-1])
else:
word = sp_line[0]
tag = sp_line[1]
if word not in word2id:
word2id[word] = word_count
id2word[word_count] = word
word_count += 1
if tag not in tag2id:
tag2id[tag] = tag_count
id2tag[tag_count] = tag
tag_count += 1
n_word = len(word2id)
n_tag = len(tag2id)
print("word size:{}".format(n_word))
print("tag size:{}".format(n_tag))
return word2id, id2word, tag2id, id2tag, n_word, n_tag
def build_params(word2id, tag2id, n_tag, n_word, train_data):
"""
build params of HMM: theta = (pi, A, B)
- pi is a vector marks the probe of each tag to be the first tag
of a sentence, size: [1, n_tag]
- A is a matrix of condition probe of words given tags,size: [n_tag, n_word]
- B is a matrix of transition probe between tags, size: [n_tag, n_tag]"""
pi = np.zeros(n_tag) # probe of starting tag
a = np.zeros((n_tag, n_word)) # condition probe p(word|tag)
b = np.zeros((n_tag, n_tag)) # transition probe p(tag_i|tag_j)
at_start_of_sentence = True
last_tag_id = None
with open(train_data) as fr:
for line in fr.readlines():
word, tag = line.strip().split("/")
word_id = word2id.get(word)
tag_id = tag2id.get(tag)
if at_start_of_sentence:
pi[tag_id] += 1 # starting prob
a[tag_id, word_id] += 1 # cond. prob
at_start_of_sentence = False
else:
a[tag_id, word_id] += 1 # cond. prob
b[last_tag_id, tag_id] += 1 # trans. probe
if word == ".":
at_start_of_sentence = True
last_tag_id = None
else:
last_tag_id = tag_id
# done counting, normalize...
pi /= sum(pi)
for i in range(n_tag):
a[i] /= sum(a[i])
b[i] /= sum(b[i])
return pi, a, b
def log(num):
if num == 0:
return np.log(1e-8)
return np.log(num)
def viterbi(sentence, pi, a, b, word2id, id2tag):
"""
decode with viterbi
:param sentence: sentence to decode
:param pi: init probe of tag
:param a: cond probe of words given tags
:param b: trans probe between tags
:return:
"""
x = [word2id[word] for word in sentence.split()] # words of sentence
t = len(x)
n_tag = len(tag2id)
dp = np.zeros((t, n_tag))
path_record = np.array([[0 for _ in range(n_tag)] for _ in range(n_tag)])
for j in range(n_tag):
dp[0][j] = log(pi[j]) + log(a[j, x[0]])
# dp
for i in range(1, t): # 每个单词
for j in range(n_tag): # 每个词性
dp[i][j] = -1e6
for k in range(n_tag):
score = dp[i-1][k] + log(b[k][j]) + log(a[j, x[i]])
if score > dp[i, j]:
dp[i, j] = score
path_record[i, j] = k
# find best sequence
best_tag_sequence = [0] * t
best_tag_sequence[t-1] = np.argmax(dp[t-1])
for i in range(t-2, -1, -1):
best_tag_sequence[i] = path_record[i + 1, best_tag_sequence[i+1]]
tag_sequence = [id2tag[idx] for idx in best_tag_sequence]
return tag_sequence
train_data_path = "./resource/traindata.txt"
test_sentence = "The big question is whether the president will have the strength ."
# 1. build mapper of words and tags
word2id, id2word, tag2id, id2tag, n_word, n_tag = build_mapper(train_data_path)
# 2. \theta = (\pi, A, B), build them
pi, a, b = build_params(word2id, tag2id, n_tag, n_word, train_data_path)
# 3. find optimal path with viterbi
tag_sequence = viterbi(test_sentence, pi, a, b, word2id, id2tag)
print(tag_sequence) # ['DT', 'JJ', 'NN', 'VBZ', 'IN', 'DT', 'NN', 'MD', 'VB', 'DT', 'NN', '.']
|