# -*- coding: utf-8 -*-
"""
利用skip-gram方法实现把词向量转化成特征向量
参考《efficient estimation of word representations in vector space》
"""
import numpy as np
from itertools import chain
import pandas as pd
class Word2vec():
def __init__(self, nHidden=300, window=4):
self.eb = 0.01 # 误差容限
self.eta = 0.01 # 学习率
self.maxiter = 10 # 最大迭代次数
self.errlist = [] # 误差列表
self.data = None # 数据集
self.label = None # 标记集
self.nHidden = nHidden # 隐含层神经元,在这儿也表示词嵌入后的空间维度
self.nOut = 0 # 输出层
self.hide_wb = None # 隐含层模型参数
self.out_wb = None # 输出层模型参数
self.nSampDim = 0 # 特征维度(词汇表大小)
self.nSampNum = 0 # 样本个数
self.dict_table = None # 字典查找表
self.window = window # 窗口大小
def softmax(self, value):
"""
softmax回归,映射成概率值
:param value:
:return:
"""
value_exp = np.exp(value)
partition = np.sum(value_exp, axis=1)
return value_exp / partition
def back_propagate(self, index):
"""
利用反向传播算法优化网络
:param index:
:return:
"""
self.data = index[0, :]
self.label = index[1, :]
for i in range(self.maxiter):
# 工作信号正向传播
# 输入层到隐含层
hide_value = self.hide_wb[self.data]
# 隐含层到输出层
out_input = hide_value * self.out_wb
out_output = self.softmax(out_input)
p_out = out_output[range(self.nSampNum), self.label]
# 误差计算
err = np.copy(out_output)
err[range(self.nSampNum), self.label] -= 1 # 花式索引
self.errlist.append(self.errorfunc(p_out))
# 判断是否收敛至最优
if self.errorfunc(p_out) <= self.eb:
break
# 误差信号反向传播
# 计算输出层梯度
dout_wb = hide_value.T * err
# 计算隐含层梯度(这儿的权重矩阵是以输入样本的索引为基础的,说来话长)
dhide_wb = self.out_wb[:, self.label].T - out_output * self.out_wb.T
# 更新输出层和隐含层权值
self.out_wb = self.out_wb + self.eta * dout_wb
for ind, del_hide in zip(self.data, dhide_wb):
self.hide_wb[ind] += self.eta * del_hide
def train(self, word_pair):
"""
首先提取出单词对对应的索引序号,一个输入序号向量和一个输出序号向量,再利用skip-gram方法训练网络模型
也可以通过调用该方法继续训练
:param word_pair: 单词元组(input, output)组成的list
:return:
"""
if isinstance(word_pair, tuple):
word_pair = [word_pair] # 假如是单个元组,先转换成list
# 查表,找出单词对应的索引,不清楚有没有更为优雅的方式
self.nSampNum = len(word_pair)
indexArr_reval = self.dict_table[chain.from_iterable(word_pair)].values
indexArr = indexArr_reval.reshape(-1, 2).T
self.back_propagate(indexArr)
def skip_gram(self, corpus):
"""
利用skip-gram算法训练词向量
:param corpus: 用词汇组成的语料库,具体形式为二维list,一行表示一句话,每个元素表示一个单词
:return:
"""
vocabulary = set(chain.from_iterable(corpus))
self.init_net(len(vocabulary))
self.dict_table = pd.Series(range(len(vocabulary)), index=list(vocabulary)) # 词汇表
# 这儿不是很有必要,直接保留索引即可
# self.data = pd.get_dummies(self.dict_table) # 数据集(用ont-hot编码后的词汇表索引列表)
del vocabulary
for i, sentence in enumerate(corpus):
wp = self.get_word_pairs(sentence)
self.train(wp)
print("Training sentence:", i, "current loss:", self.errlist[-1])
return pd.DataFrame(index=self.dict_table.index, data=np.array(self.hide_wb))
def get_word_pairs(self, sentence):
"""
把一句话转化成单词对,用于训练网络模型
:param sentence: 由单词组成的句子list
:return: 由元组组成的list, 元组内容为单词对,表示跳词模型窗口内的中心词和背景词(input, output)
"""
word_pair = []
for pos, word in enumerate(sentence):
start = max(0, pos - self.window)
end = min(len(sentence), pos + self.window + 1)
for i in range(start, end):
if i != pos:
word_pair.append((word, sentence[i]))
return word_pair
def init_net(self, n):
"""
初始化网络参数
:param n: 词汇表的大小
:return:
"""
self.nSampDim = n
self.nOut = n
self.hide_wb = np.mat(2.0 * (np.random.rand(self.nSampDim, self.nHidden) - 0.5))
self.out_wb = np.mat(2.0 * (np.random.rand(self.nHidden, self.nOut) - 0.5))
def errorfunc(self, inX):
"""
损失函数,条件概率的负对数之和
:param inX:
:return:
"""
return - (np.log(inX)).sum()
def TrendLine(self, plt, color='r', type='loss'):
"""
绘制损失曲线
:param plt:
:param color:
:param type:
:return:
"""
X = np.linspace(0, len(self.errlist), len(self.errlist))
Y = np.array(self.errlist)
plt.xlabel("iteration")
plt.ylabel(type)
plt.plot(X, Y, color)