用HMM模型进行序列标注

好久没更新博客了,最近正在上课,身边优秀人太多了….

疯狂充实自己ing~

正好花了点时间做了个小作业写了一个序列标注,来记录一下。

用的模型是隐马尔科夫模型HMM,用的是hmmlearn开源工具包。

当然有能力可以自己写。主要就是维特比算法。一个动态规划问题。还有就是求出来初始状态概率,词性转移概率,和发射概率即可。

HMM模型的介绍这里就不展开了,就放些代码好了。

语料是上课老师给的一个,大概40多M,90000多行。如下图:

img

代码一些变量命名有的都是我随意命名的…真的比较讨厌的就是起名字。平常打游戏也是,想个ID能想好久。

还有一些函数调用比较乱,然后就是代码有些for循环速度比较慢,没有优化==。

训练测试采用8:2,没有使用交叉验证,…还得从新跑模型就懒得搞了。。

数据预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from functools import reduce
import random

class data_pre:
def __init__(self, file):
self.file = file

def preposs(self):
fw = open(self.file, 'r', encoding='utf-8')
ans = fw.readlines()
# text = sample(ans, 78847)
# random.shuffle(ans)
text = ans[0:78847]
# test = ans[78847:79000]
word = []
cha = []
for line in text:
content = line.strip().split()
for id in content:
i = id.split('/')
if (len(i) == 2):
word.append(i[0])
cha.append(i[1])
# print(len(word))
# print(len(cha))
word_set = set(word)
cha_set = set(cha)
word_set_list = list(word_set)
cha_set = list(cha_set)
fw.close()
return text, cha_set, word_set_list, len(cha_set)

统计词和词性从而方便计算概率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import numpy as no

from data_pre import *


def hmm_word(file):
data_per = data_pre(file)
text, cha_set, word_set_list, cha_len = data_per.preposs()
# print(cha_set)
# print(cha_len)

# 初始状态概率
word_dict = {}
for id in cha_set:
word_dict[id] = 0

for line in text:
content = line.strip().split()
con = content[0].split('/')[1]
word_dict[con] += 1

for id in cha_set:
word_dict[id] = word_dict.get(id) / len(text)
# print(word_dict)

# 词性转移概率
cixing_A = {}
for line in text:
li = []
content = line.strip().split()
for id in content:
i = id.split('/')
if (len(i) == 2):
li.append(i[1])
for x in range(len(li) - 1):
if li[x] not in cixing_A.keys():
cixing_A[li[x]] = {}
cixing_A[li[x]][li[x] + ':' + li[x + 1]] = 1
else:
if li[x] + ':' + li[x + 1] not in cixing_A[li[x]]:
cixing_A[li[x]][li[x] + ':' + li[x + 1]] = 1
else:
cixing_A[li[x]][li[x] + ':' + li[x + 1]] += 1

for re in cixing_A.keys():
ans = 0
for r in cixing_A[re].values():
ans += r
for id in cixing_A[re].keys():
cixing_A[re][id] = cixing_A[re][id] / ans

# print(cixing_A)

# 发射概率
fashe = {}
for line in text:
li = []
content = line.strip().split()
for id in content:
i = id.split('/')
if (len(i) == 2):
li.append(i[1])
for x in range(len(li)):
if li[x] not in fashe.keys():
fashe[li[x]] = 1
else:
fashe[li[x]] += 1
# print(fashe)
fashe_res = {}
for line in text:
li = []
content = line.strip().split()
for id in content:
i = id.split('/')
if (len(i) == 2):
li.append(i[0] + '$' + i[1])
for x in li:
if x not in fashe_res.keys():
fashe_res[x] = 1
else:
fashe_res[x] += 1

for id in fashe_res.keys():
i = id.split('$')
fashe_res[id] = fashe_res[id] / fashe.get(i[-1])

# print(fashe_res)

return cha_set, word_set_list, word_dict, cixing_A, fashe_res

计算概率和模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import numpy as np
from hmmlearn import hmm
from hmm_word import *
from sklearn.externals import joblib

# 用hmmlearn 用HMM 对序列进行序列标注 维特比算法

file = ''
print('start')
states, word_set_list, word_dict, tran_pro, emiss = hmm_word(file)

states_num = np.array(states)
np.save("states.npy", states_num)
word_set_list_num = np.array(word_set_list)
np.save("word_set_list.npy", word_set_list_num)

# 初始状态概率
list_first = []
for id in states:
if id not in word_dict.keys():
list_first.append(0)
else:
list_first.append(word_dict.get(id))

start_probability = np.array(list_first) # 初始状态概率
np.save("start_probability.npy", start_probability)
print(start_probability.shape)
print('···················初始状态转移概率计算完毕······················')

# 词性状态转移概率
list_tran_ans = []
for id in states:
list_tran = []
if id not in tran_pro.keys():
list_tran = [0] * len(states)
else:
for di in states:
keys = list(tran_pro[id].keys())
for dd in range(len(keys)):
keys[dd] = keys[dd].split(':')[1]
if di not in keys:
list_tran.append(0)
else:
list_tran.append(tran_pro[id].get(id + ':' + di))
list_tran_ans.append(list_tran)

# print(len(list_tran_ans))
# print(len(list_tran_ans[0]))
transition_probability = np.array(list_tran_ans) # 词性转移概率
np.save("transition_probability.npy", transition_probability)
print(transition_probability.shape)
print('····················词性转移概率计算完毕····················')

# 发射概率 len(states) * len(word_set)
list_emiss_ans = []
trr = 1
print('·················开始计算发射概率·····················')
for id in states:
list_emiss = []
for lin in word_set_list:
st = lin + '$' + id
if st in emiss.keys():
list_emiss.append(emiss.get(st))
else:
list_emiss.append(0)
list_emiss_ans.append(list_emiss)
print('···················正在计算发射概率,共' + str(len(states)) + '轮,已处理%d轮················' % trr)
trr += 1

emission_probability = np.array(list_emiss_ans) # 发射概率
np.save("emission_probability.npy", emission_probability)
print(emission_probability.shape)
print('····················发射概率计算完毕····················')

n_states = len(states)
n_observations = len(word_set_list) # 观察序列长度

model = hmm.MultinomialHMM(n_components=n_states)
model.startprob_ = start_probability
model.transmat_ = transition_probability
model.emissionprob_ = emission_probability

# 保存模型
joblib.dump(model, "filename.pkl")
print('····························模型save成功···························')
print('End!!')

# 加载模型
# model = joblib.load("filename.pkl")
# logprob, state = model.decode(seen, algorithm='viterbi')

测试数据进行预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np
from hmmlearn import hmm
from sklearn.externals import joblib

start_probability = np.load("start_probability.npy")
transition_probability = np.load("transition_probability.npy")
emission_probability = np.load("emission_probability.npy")

model = hmm.MultinomialHMM(n_components=start_probability.shape[0])
model.startprob_ = start_probability
model.transmat_ = transition_probability
model.emissionprob_ = emission_probability

file = ''

fw = open(file, 'r', encoding='utf-8')
ans = fw.readlines()
train = ans[0:78847]
test = ans[78847:]
states = np.load("states.npy")
states = list(states)
word_set_list = np.load("word_set_list.npy")
word_set_list = list(word_set_list)

print('··············开始处理测试数据,共%d条············' % len(test))
label = []
test_test = []
anss = 1
for line in test:
test_list = []
zhunquelv = []
content = line.strip().split()
flag = 0
for id in content:
i = id.split('/')
if (len(i) == 2):
if i[0] not in word_set_list or i[1] not in states:
flag = 1
break
if flag == 0:
for id in content:
i = id.split('/')
if (len(i) == 2):
zhunquelv.append(i[1])
test_list.append(word_set_list.index(i[0]))
if len(zhunquelv) != 0:
label.append(zhunquelv)
test_test.append(test_list)
if (anss % 1000 == 0):
print('···············测试记录已处理%d条·············' % anss)
anss += 1
print('·············测试记录处理完毕·················')
print('开始预测,测试记录共' + str(len(test)) + '条,但是由于有的测试记录的词或者词性训练集里没有,于是将该行删掉。')
print('最后预测的共%d条' % len(test_test))

label_ans = 0
ci_error = 0
hang_zhunquelv = 0

for id in range(len(test_test)):
flag_pre = 0
seen = np.array([test_test[id]]).T
state_pre = model.predict(seen)
test_prev_1 = [item for item in map(lambda x: states[x], state_pre)]
# print("观察序列为:", ''.join(map(lambda x: word_set_list[x], test_test[id])))
# print('预测序列标注词性为:', ' '.join(map(lambda x: states[x], state_pre)))
# print('实际序列标注为:', ' '.join(map(lambda x: x, label[id])))
for i in range(len(test_prev_1)):
label_ans += 1
if test_prev_1[i] != label[id][i]:
ci_error += 1
flag_pre = 1
if flag_pre == 0:
hang_zhunquelv += 1

if id % 1000 == 0:
print('已经预测完%d行··················' % id)

print('预测错误的次数词性个数为:%d' % ci_error)
print('总词数为:%d' % label_ans)
print('一行全部预测正确的行数为:%d' % hang_zhunquelv)
print('总行数为:%d' % (len(test_test)))
print('词标对的错误率即占比为:%f' % (ci_error / label_ans))
print('该行全部标对的准确率为%f' % (hang_zhunquelv / len(test_test)))

预测结果如下图:

如果所有句子词加起来统计错误率为1.9%

如果按句子来计算准确率的话 是 65.3%

img

-------------The End-------------