- 编写词典构建代码
- 编写数据加载器代码
- 编写模型定义代码
- 编写训练代码
- 编写测试代码
1. 编写词典构建代码
在开始训练词向量之前,需要先根据语料库构建出自己的词典,方便将文本语料内容转换为索引表示。在下面代码中,build_mapping_vocab 函数主要返回 3 个内容:
- idx_to_word 表示通过索引映射到具体的字、词
- word_to_idx 表示通过字、词映射到具体的索引
- word_freq 表示语料库中每个词出现的次数
接下来,在 build_train_corpus 函数中将原料库转换为索引表示。
import jieba import numpy as np from collections import Counter import joblib import torch corpus_path = 'data/corpus.txt' # 构建映射词汇表 def build_mapping_vocab(): # 1. 读取数据内容 sentences = [] for sentence in open(corpus_path, encoding='gbk'): # 去除换行符 sentence = sentence.strip() if len(sentence): sentences.append(sentence) # 2. 句子分词 sentences_cut_words = [] for sentence in sentences: sentences_cut_words.append(jieba.lcut(sentence)) # 3. 构建词频表 word_freq = {'UNK': -1} for sentence_words in sentences_cut_words: for word, freq in Counter(sentence_words).items(): if word in word_freq.keys(): word_freq[word] += freq else: word_freq[word] = freq # 4. 构建词汇表 idx_to_word = [word for word in word_freq.keys() if word != 'UNK'] idx_to_word.insert(0, 'UNK') word_to_idx = {word: idx for idx, word in enumerate(idx_to_word)} # 5. 存储对象 joblib.dump(idx_to_word, 'data/idx_to_word.txt') joblib.dump(word_to_idx, 'data/word_to_idx.txt') joblib.dump(word_freq, 'data/word_freq.txt') return idx_to_word, word_to_idx, word_freq # 构建训练数据内容 def build_train_corpus(): # 1. 加载 word_to_idx 映射表 word_to_idx = joblib.load('data/word_to_idx.txt') # 2. 读取数据内容 sentences = [] for sentence in open(corpus_path, encoding='gbk'): # 去除换行符 sentence = sentence.strip() if len(sentence): sentences.append(sentence) # 3. 句子分词 sentences_cut_words = [] for sentence in sentences: sentences_cut_words.append(jieba.lcut(sentence)) # 4. 句子转换为索引表示 corpus_words = [] for sentence in sentences_cut_words: sentence_index = [] for word in sentence: if word not in word_to_idx.keys(): sentence_index.append(word_to_idx['UNK']) continue sentence_index.append(word_to_idx[word]) corpus_words.extend(sentence_index) corpus_words = torch.tensor(corpus_words) # 存储对象 joblib.dump(corpus_words, 'data/corpus_words.txt') def test01(): # 构建词汇掉 build_mapping_vocab() # 语料索引表示 build_train_corpus() corpus_words = joblib.load('data/corpus_words.txt') print('词的总数:', len(corpus_words)) print('语料内容:', corpus_words)
程序的输出结果:
词的总数: 969 语料内容: [1, 2, 3, 4, 5, 6, 7, 8, 9, 1 ... 79, 379, 379, 161]
2. 编写数据加载器代码
编写一个 SkipGramDataset 类,该类用于定义如何返回一条训练样本,以及每一条训练样本所包含的内容。该类可配合 DataLoader 类实现批量训练数据加载。
下面代码中,在 __getitem__ 函数中实现了中心词(输入词)获取,预测词的获取(正样本标签),以及根据每个词出现的频率随机采样 32 个词作为负样本标签。
from torch.utils.data import Dataset from torch.utils.data import DataLoader # 构建批量数据加载器 class SkipGramDataset(Dataset): def __init__(self, n_gram=2, negative=32): super(SkipGramDataset, self).__init__() self.n_gram = n_gram self.negative = negative # 加载需要的数据 self.corpus_words = joblib.load('data/corpus_words.txt') self.word_freq = joblib.load('data/word_freq.txt') # 计算每个词出现的频率 freq = torch.tensor(list(self.word_freq.values())) self.word_freq = freq / freq.sum() def __len__(self): """用于返回总的数据长度""" return len(self.corpus_words) def __getitem__(self, idx): """根据索引范围一条训练样本""" # 修正 idx 的值 idx = min(max(idx, self.n_gram), len(self.corpus_words) - self.n_gram - 1) corpus_words = torch.tensor(self.corpus_words) # 获得中心词 center_word = corpus_words[idx] # 获得正样本标签 pos_index = list(range(idx - self.n_gram, idx)) + list(range(idx + 1, idx + 1 + self.n_gram)) pos_label = corpus_words[pos_index] # 获得负样本标签 # 随机从高频的负样本标签中采样 self.negative 个负标签 neg_label = torch.multinomial(self.word_freq, num_samples=self.negative + self.n_gram * 2) # 由于我们多采样了 self.n_gram * 2 个负样本 # 使用 np.setdiff1d 函数从 neg_label 中去除 pos_label 内容, 并取前 self.negative 个元素 neg_label = np.setdiff1d(neg_label.numpy(), pos_label.numpy())[:self.negative] return (center_word, pos_label, neg_label) def test02(): # 1. 加载词汇表 idx_to_word = joblib.load('data/idx_to_word.txt') word_to_idx = joblib.load('data/word_to_idx.txt') word_freq = joblib.load('data/word_freq.txt') # 2. 语料内容索引化 corpus_words = build_train_corpus() # 3. 获得一条样本 skipgram_data = SkipGramDataset(n_gram=5, negative=32) dataloader = DataLoader(skipgram_data, batch_size=16, shuffle=True) for center_words, pos_labels, neg_labels in dataloader: print(center_words.shape) print(pos_labels.shape) print(neg_labels.shape) break if __name__ == '__main__': # test01() test02()
3. 编写模型定义代码
词训练模型 SkipGramModel 中只包含 nn.Embedding 层,我们的训练策略是:中心词与正样本标签越相似越好,负中心词与负样本标签越相似越好,将上面两项相似度加起来,该值越大越好。通过把值映射到 [0, inf] 之间,我们训练的目标是该值越接近于 0 越好。
import torch.nn as nn import torch.nn.functional as F class SkipGramModel(nn.Module): def __init__(self, vocab_size, dim_size): """ :param vocab_size: 词的总数 :param dim_size: 词嵌入维度 """ super(SkipGramModel, self).__init__() self.vocab_size = vocab_size self.dim_size = dim_size # 添加词嵌入层 self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=dim_size) # 初始化层权重 self.embed.weight.data.uniform_(-0.5 / self.dim_size, 0.5 / self.dim_size) def forward(self, center_words, pos_labels, neg_labels): """ center_words_embedding: torch.Size([16, 128]) pos_labels_embedding: torch.Size([16, 10, 128]) neg_labels_embedding: torch.Size([16, 32, 128]) """ # 中心词词嵌入 center_words_embedding = self.embed(center_words) # 正标签词嵌入 pos_labels_embedding = self.embed(pos_labels) # 负标签词嵌入 neg_labels_embedding = self.embed(neg_labels) # 计算中心词与正负标签的点积相似度 # pos_similarity 值越大表示中心词和正样本标签相似度越高 # neg_similarity 值越大表示负中心词和负样本标签相似度越高 pos_similarity = torch.bmm(pos_labels_embedding, center_words_embedding.unsqueeze(-1)) neg_similarity = torch.bmm(neg_labels_embedding, -center_words_embedding.unsqueeze(-1)) # 由于点积相似度的取值区间为 (-inf, inf) # 接下来使用 logsigmoid 将值域限制在 (-inf, 0) # 值越大 logsigmoid 越接近 0,即: 损失越小 pos_loss = F.logsigmoid(pos_similarity).sum(1) neg_loss = F.logsigmoid(neg_similarity).sum(1) # 对结果取反表示: 越相似的损失接近0,越不相似则损失越大 # 我们的目标是减少损失值,让中心词与正标签词越相似,与负标签越不相似 return -(pos_loss + neg_loss) def test03(): corpus_words = joblib.load('data/corpus_words.txt') # 定义输出词嵌入维度为 128 model = SkipGramModel(vocab_size=len(corpus_words), dim_size=128) # 获得一个批次数据 skipgram_data = SkipGramDataset(n_gram=5, negative=32) dataloader = DataLoader(skipgram_data, batch_size=16, shuffle=True) center_words, pos_labels, neg_labels = next(iter(dataloader)) # 计算一个批次的损失 loss = model(center_words, pos_labels, neg_labels) print('损失:', loss.mean().item()) if __name__ == '__main__': # test01() # test02() test03()
4. 编写训练代码
import torch.optim as optim import time def model_train(): # 初始化数据对象 train_data = SkipGramDataset(n_gram=2, negative=32) # 初始化词向量训练模型 model = SkipGramModel(vocab_size=len(train_data), dim_size=128) # 初始化优化器 optimizer = optim.Adam(model.parameters(), lr=1e-3) # 定义训练轮数 epochs = 5000 for epoch_idx in range(epochs): # 初始化数据加载器 dataloader = DataLoader(train_data, batch_size=16) # 累计批次损失 total_loss = 0.0 total_samples = 0 # 统计 epoch 训练时间 start = time.time() for center_words, pos_labels, neg_labels in dataloader: # 批次样本送入网络计算损失 loss = model(center_words, pos_labels, neg_labels).sum() total_loss += loss.item() total_samples += len(center_words) # 梯度清零 optimizer.zero_grad() # 反向传播 loss.backward() # 更新参数 optimizer.step() end = time.time() print('epoch: %d loss: %.5f time: %.2f' % (epoch_idx + 1, total_loss / total_samples, end - start)) # 模型保存 torch.save(model.state_dict(), 'model/model.bin') if __name__ == '__main__': # test01() # test02() # test03() model_train()