模型构建主要包括了 CRF 层的实现,以及 BiLSTM 层的实现。其中 CRF 层相对复杂一些,主要有两个较难的难点:一、要计算所有路径的损失,二、要根据发射矩阵回溯最优路径。
import torch import torch.nn as nn import torch.optim as optim import numpy as np from torch.nn.utils.rnn import pad_sequence from torch.nn.utils.rnn import pack_padded_sequence from torch.nn.utils.rnn import pad_packed_sequence from datasets import load_from_disk from transformers import BertTokenizer # 定义计算设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
1. CRF 层实现
CRF 层实现时计算损失和维特比回溯最优路径,可参考:http://mengbaoliang.cn/?p=33519
class CRF(nn.Module): def __init__(self, label_num): super(CRF, self).__init__() # 转移矩阵的标签数量 self.label_num = label_num # [TAG1, TAG2, TAG3...STAR, END] params = torch.randn(self.label_num + 2, self.label_num + 2) self.transition_scores = nn.Parameter(params) # 开始和结束标签 START_TAG, ENG_TAG = self.label_num, self.label_num + 1 self.transition_scores.data[:, START_TAG] = -1000 self.transition_scores.data[ENG_TAG, :] = -1000 # 定义一个较小值用于扩展发射和转移矩阵时填充 self.fill_value = -1000.0 def _log_sum_exp(self, score): # 计算 e 的指数时,每个元素都减去最大值,避免数值溢出 max_score, _ = torch.max(score, dim=0) max_score_expand = max_score.expand(score.shape) return max_score + torch.log(torch.sum(torch.exp(score - max_score_expand), dim=0)) def _get_real_path_score(self, emission_score, sequence_label): # 计算标签的数量 seq_length = len(sequence_label) # 计算真实路径发射分数 real_emission_score = torch.sum(emission_score[list(range(seq_length)), sequence_label]) # 在真实标签序列前后增加一个 start 和 end b_id = torch.tensor([self.label_num], dtype=torch.int32, device=device) e_id = torch.tensor([self.label_num + 1], dtype=torch.int32, device=device) sequence_label_expand = torch.cat([b_id, sequence_label, e_id]) # 计算真实路径转移分数 pre_tag = sequence_label_expand[list(range(seq_length + 1))] now_tag = sequence_label_expand[list(range(1, seq_length + 2))] real_transition_score = torch.sum(self.transition_scores[pre_tag, now_tag]) # 计算真实路径分数 real_path_score = real_emission_score + real_transition_score return real_path_score def _expand_emission_matrix(self, emission_score): # 计算标签的数量 sequence_length = emission_score.shape[0] # 扩展时会增加 START 和 END 标签,定义该标签的值 b_s = torch.tensor([[self.fill_value] * self.label_num + [0, self.fill_value]], device=device) e_s = torch.tensor([[self.fill_value] * self.label_num + [self.fill_value, 0]], device=device) # 扩展发射矩阵为 (self.label_num + 2, self.label_num + 2) expand_matrix = self.fill_value * torch.ones([sequence_length, 2], dtype=torch.float32, device=device) emission_score_expand = torch.cat([emission_score, expand_matrix], dim=1) emission_score_expand = torch.cat([b_s, emission_score_expand, e_s], dim=0) return emission_score_expand def _get_total_path_score(self, emission_score): # 扩展发射分数矩阵 emission_score_expand = self._expand_emission_matrix(emission_score) # 计算所有路径分数 pre = emission_score_expand[0] for obs in emission_score_expand[1:]: # 扩展 pre 维度 pre_expand = pre.reshape(-1, 1).expand([self.label_num + 2, self.label_num + 2]) # 扩展 obs 维度 obs_expand = obs.expand([self.label_num + 2, self.label_num + 2]) # 扩展之后 obs pre 和 self.transition_scores 维度相同 score = obs_expand + pre_expand + self.transition_scores # 计算对数分数 pre = self._log_sum_exp(score) return self._log_sum_exp(pre) def forward(self, emission_scores, sequence_labels): total_loss = 0.0 for emission_score, sequence_label in zip(emission_scores, sequence_labels): # 计算真实路径得分 real_path_score = self._get_real_path_score(emission_score, sequence_label) # 计算所有路径分数 total_path_score = self._get_total_path_score(emission_score) # 最终损失 finish_loss = total_path_score - real_path_score # 累加不同句子的损失 total_loss += finish_loss return total_loss def predict(self, emission_score): """使用维特比算法,结合发射矩阵+转移矩阵计算最优路径""" # 扩展发射分数矩阵 emission_score_expand = self._expand_emission_matrix(emission_score) # 计算分数 ids = torch.zeros(1, self.label_num + 2, dtype=torch.long, device=device) val = torch.zeros(1, self.label_num + 2, device=device) pre = emission_score_expand[0] for obs in emission_score_expand[1:]: # 扩展 pre 维度 pre_expand = pre.reshape(-1, 1).expand([self.label_num + 2, self.label_num + 2]) # 扩展 obs 维度 obs_expand = obs.expand([self.label_num + 2, self.label_num + 2]) # 扩展之后 obs pre 和 self.transition_scores 维度相同 score = obs_expand + pre_expand + self.transition_scores # 获得当前多分支中最大值的分支索引 value, index = score.max(dim=0) # 拼接每一个时间步的结果 ids = torch.cat([ids, index.unsqueeze(0)], dim=0) val = torch.cat([val, value.unsqueeze(0)], dim=0) # 计算分数 pre = value # 先取出最后一个的最大值 index = torch.argmax(val[-1]) best_path = [index] # 再回溯前一个最大值 # 由于为了方便拼接,我们在第一个位置默认填充了0 for i in reversed(ids[1:]): # 获得分数最大的索引 # index = torch.argmax(v) # 获得索引对应的标签ID index = i[index].item() best_path.append(index) best_path = best_path[::-1][1:-1] return best_path
2. BiLSTM 层的实现
BiLSTM 层包括三个组件:nn.Embedding、nn.LSTM、nn.Linear,其中 nn.LSTM 需要设置 bidirectional 属性为 True,表示我们使用的是双向 LSTM,由左向右、以及从右向左提取输入词的语义表示,输出的维度为 hidden_size 的 2 倍。
在 forward 函数中,由于要进行批量计算。所以传入该函数中的批量数据是经过 pack_padded_sequence 函数压缩,在 LSTM 层计算结束时,再使用 pad_packed_sequence 函数将压缩后的数据还原成以 0 填充的方式。最后,再送入线性层进行计算。
class BiLSTM(nn.Module): def __init__(self, vocab_size, label_num): super(BiLSTM, self).__init__() # 用于将输入转换为词向量 self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=256) # 用于提取输入的双向语义表示向量 self.blstm = nn.LSTM(input_size=256, hidden_size=512, bidirectional=True, num_layers=1) # 用于将 self.blstm 的输出向量映射为标签 logits self.liner = nn.Linear(in_features=1024, out_features=label_num) def forward(self, inputs, length): # 将输入的 token 索引转换为词向量 outputs_embed = self.embed(inputs) # 由于填充了很多0,此处将0进行压缩 outputs_packd = pack_padded_sequence(outputs_embed, length) # BiLSTM 用于提取双向语义, 提取每个句子中的 token 表示 outputs_blstm, (hn, cn) = self.blstm(outputs_packd) # outputs_paded 表示填充后的 BiLSTM 对每个 token 的输出 # outputs_length 表示每个句子实际的长度 outputs_paded, output_lengths = pad_packed_sequence(outputs_blstm) outputs_paded = outputs_paded.transpose(0, 1) # 线性层计算,计算出发射矩阵,形状: (16, 57, 7) output_logits = self.liner(outputs_paded) outputs = [] for output_logit, outputs_length in zip(output_logits, output_lengths): outputs.append(output_logit[:outputs_length]) return outputs def predict(self, inputs): # 将输入的 token 索引转换为词向量 outputs_embed = self.embed(inputs) # 增加一个 batch 维度在 1 位置 outputs_embed = outputs_embed.unsqueeze(1) # 对每个 Token 进行语义表示 outputs_blstm, (hn, cn) = self.blstm(outputs_embed) # 把 1 位置的 batch 值去掉 outputs_blstm = outputs_blstm.squeeze(1) # 计算每个 Token 的发射分数 output_liner = self.liner(outputs_blstm) return output_liner
3. 模型构建
模型构建就包括 BiLSTM 和 CRF 两个层,BiLSTM 负责产生每个字词的预测 logits,即:发射矩阵。CRF 用于学习转移矩阵。类内实现了两个方法:forward 方法返回损失,用于反向传播、学习参数。
predict 方法则用于评估或者预测阶段,预测最优的标签序列。其过程是,先由 BiLSTM 输出发射矩阵,接下来结合 CRF 层的转移矩阵,使用维特比算法得到最优的标签序列。
class NER(nn.Module): def __init__(self, vocab_size, label_num): super(NER, self).__init__() self.vocab_size = vocab_size self.label_num = label_num # 双向长短记忆网络 self.bilstm = BiLSTM(vocab_size=self.vocab_size, label_num=self.label_num) # 条件随机场网络层 self.crf = CRF(label_num=self.label_num) def forward(self, inputs, labels, length): # 计算输入批次样本的每个 Token 的分数,即: 每个句子的发射矩阵 emission_scores = self.bilstm(inputs, length) # 计算批次样本的总损失 batch_loss = self.crf(emission_scores, labels) # 返回总损失 return batch_loss def save_model(self, save_apth): save_info = { 'init': {'vocab_size': self.vocab_size, 'label_num': self.label_num}, 'state': self.state_dict() } torch.save(save_info, save_apth) def predict(self, inputs): # 计算输入批次样本的每个 Token 的分数,即: 每个句子的发射矩阵 emission_scores = self.bilstm.predict(inputs) # viterbi_decode 函数接收的发射矩阵为二维的 (seq_len, scores) logits = self.crf.predict(emission_scores) return logits