模型构建主要包括了 CRF 层的实现,以及 BiLSTM 层的实现。其中 CRF 层相对复杂一些,主要有两个较难的难点:一、要计算所有路径的损失,二、要根据发射矩阵回溯最优路径。
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.nn.utils.rnn import pad_sequence
from torch.nn.utils.rnn import pack_padded_sequence
from torch.nn.utils.rnn import pad_packed_sequence
from datasets import load_from_disk
from transformers import BertTokenizer
# 定义计算设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
1. CRF 层实现
CRF 层实现时计算损失和维特比回溯最优路径,可参考:https://mengbaoliang.cn/?p=33519
class CRF(nn.Module):
def __init__(self, label_num):
super(CRF, self).__init__()
# 转移矩阵的标签数量
self.label_num = label_num
# [TAG1, TAG2, TAG3...STAR, END]
params = torch.randn(self.label_num + 2, self.label_num + 2)
self.transition_scores = nn.Parameter(params)
# 开始和结束标签
START_TAG, ENG_TAG = self.label_num, self.label_num + 1
self.transition_scores.data[:, START_TAG] = -1000
self.transition_scores.data[ENG_TAG, :] = -1000
# 定义一个较小值用于扩展发射和转移矩阵时填充
self.fill_value = -1000.0
def _log_sum_exp(self, score):
# 计算 e 的指数时,每个元素都减去最大值,避免数值溢出
max_score, _ = torch.max(score, dim=0)
max_score_expand = max_score.expand(score.shape)
return max_score + torch.log(torch.sum(torch.exp(score - max_score_expand), dim=0))
def _get_real_path_score(self, emission_score, sequence_label):
# 计算标签的数量
seq_length = len(sequence_label)
# 计算真实路径发射分数
real_emission_score = torch.sum(emission_score[list(range(seq_length)), sequence_label])
# 在真实标签序列前后增加一个 start 和 end
b_id = torch.tensor([self.label_num], dtype=torch.int32, device=device)
e_id = torch.tensor([self.label_num + 1], dtype=torch.int32, device=device)
sequence_label_expand = torch.cat([b_id, sequence_label, e_id])
# 计算真实路径转移分数
pre_tag = sequence_label_expand[list(range(seq_length + 1))]
now_tag = sequence_label_expand[list(range(1, seq_length + 2))]
real_transition_score = torch.sum(self.transition_scores[pre_tag, now_tag])
# 计算真实路径分数
real_path_score = real_emission_score + real_transition_score
return real_path_score
def _expand_emission_matrix(self, emission_score):
# 计算标签的数量
sequence_length = emission_score.shape[0]
# 扩展时会增加 START 和 END 标签,定义该标签的值
b_s = torch.tensor([[self.fill_value] * self.label_num + [0, self.fill_value]], device=device)
e_s = torch.tensor([[self.fill_value] * self.label_num + [self.fill_value, 0]], device=device)
# 扩展发射矩阵为 (self.label_num + 2, self.label_num + 2)
expand_matrix = self.fill_value * torch.ones([sequence_length, 2], dtype=torch.float32, device=device)
emission_score_expand = torch.cat([emission_score, expand_matrix], dim=1)
emission_score_expand = torch.cat([b_s, emission_score_expand, e_s], dim=0)
return emission_score_expand
def _get_total_path_score(self, emission_score):
# 扩展发射分数矩阵
emission_score_expand = self._expand_emission_matrix(emission_score)
# 计算所有路径分数
pre = emission_score_expand[0]
for obs in emission_score_expand[1:]:
# 扩展 pre 维度
pre_expand = pre.reshape(-1, 1).expand([self.label_num + 2, self.label_num + 2])
# 扩展 obs 维度
obs_expand = obs.expand([self.label_num + 2, self.label_num + 2])
# 扩展之后 obs pre 和 self.transition_scores 维度相同
score = obs_expand + pre_expand + self.transition_scores
# 计算对数分数
pre = self._log_sum_exp(score)
return self._log_sum_exp(pre)
def forward(self, emission_scores, sequence_labels):
total_loss = 0.0
for emission_score, sequence_label in zip(emission_scores, sequence_labels):
# 计算真实路径得分
real_path_score = self._get_real_path_score(emission_score, sequence_label)
# 计算所有路径分数
total_path_score = self._get_total_path_score(emission_score)
# 最终损失
finish_loss = total_path_score - real_path_score
# 累加不同句子的损失
total_loss += finish_loss
return total_loss
def predict(self, emission_score):
"""使用维特比算法,结合发射矩阵+转移矩阵计算最优路径"""
# 扩展发射分数矩阵
emission_score_expand = self._expand_emission_matrix(emission_score)
# 计算分数
ids = torch.zeros(1, self.label_num + 2, dtype=torch.long, device=device)
val = torch.zeros(1, self.label_num + 2, device=device)
pre = emission_score_expand[0]
for obs in emission_score_expand[1:]:
# 扩展 pre 维度
pre_expand = pre.reshape(-1, 1).expand([self.label_num + 2, self.label_num + 2])
# 扩展 obs 维度
obs_expand = obs.expand([self.label_num + 2, self.label_num + 2])
# 扩展之后 obs pre 和 self.transition_scores 维度相同
score = obs_expand + pre_expand + self.transition_scores
# 获得当前多分支中最大值的分支索引
value, index = score.max(dim=0)
# 拼接每一个时间步的结果
ids = torch.cat([ids, index.unsqueeze(0)], dim=0)
val = torch.cat([val, value.unsqueeze(0)], dim=0)
# 计算分数
pre = value
# 先取出最后一个的最大值
index = torch.argmax(val[-1])
best_path = [index]
# 再回溯前一个最大值
# 由于为了方便拼接,我们在第一个位置默认填充了0
for i in reversed(ids[1:]):
# 获得分数最大的索引
# index = torch.argmax(v)
# 获得索引对应的标签ID
index = i[index].item()
best_path.append(index)
best_path = best_path[::-1][1:-1]
return best_path
2. BiLSTM 层的实现
BiLSTM 层包括三个组件:nn.Embedding、nn.LSTM、nn.Linear,其中 nn.LSTM 需要设置 bidirectional 属性为 True,表示我们使用的是双向 LSTM,由左向右、以及从右向左提取输入词的语义表示,输出的维度为 hidden_size 的 2 倍。
在 forward 函数中,由于要进行批量计算。所以传入该函数中的批量数据是经过 pack_padded_sequence 函数压缩,在 LSTM 层计算结束时,再使用 pad_packed_sequence 函数将压缩后的数据还原成以 0 填充的方式。最后,再送入线性层进行计算。
class BiLSTM(nn.Module):
def __init__(self, vocab_size, label_num):
super(BiLSTM, self).__init__()
# 用于将输入转换为词向量
self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=256)
# 用于提取输入的双向语义表示向量
self.blstm = nn.LSTM(input_size=256,
hidden_size=512,
bidirectional=True,
num_layers=1)
# 用于将 self.blstm 的输出向量映射为标签 logits
self.liner = nn.Linear(in_features=1024, out_features=label_num)
def forward(self, inputs, length):
# 将输入的 token 索引转换为词向量
outputs_embed = self.embed(inputs)
# 由于填充了很多0,此处将0进行压缩
outputs_packd = pack_padded_sequence(outputs_embed, length)
# BiLSTM 用于提取双向语义, 提取每个句子中的 token 表示
outputs_blstm, (hn, cn) = self.blstm(outputs_packd)
# outputs_paded 表示填充后的 BiLSTM 对每个 token 的输出
# outputs_length 表示每个句子实际的长度
outputs_paded, output_lengths = pad_packed_sequence(outputs_blstm)
outputs_paded = outputs_paded.transpose(0, 1)
# 线性层计算,计算出发射矩阵,形状: (16, 57, 7)
output_logits = self.liner(outputs_paded)
outputs = []
for output_logit, outputs_length in zip(output_logits, output_lengths):
outputs.append(output_logit[:outputs_length])
return outputs
def predict(self, inputs):
# 将输入的 token 索引转换为词向量
outputs_embed = self.embed(inputs)
# 增加一个 batch 维度在 1 位置
outputs_embed = outputs_embed.unsqueeze(1)
# 对每个 Token 进行语义表示
outputs_blstm, (hn, cn) = self.blstm(outputs_embed)
# 把 1 位置的 batch 值去掉
outputs_blstm = outputs_blstm.squeeze(1)
# 计算每个 Token 的发射分数
output_liner = self.liner(outputs_blstm)
return output_liner
3. 模型构建
模型构建就包括 BiLSTM 和 CRF 两个层,BiLSTM 负责产生每个字词的预测 logits,即:发射矩阵。CRF 用于学习转移矩阵。类内实现了两个方法:forward 方法返回损失,用于反向传播、学习参数。
predict 方法则用于评估或者预测阶段,预测最优的标签序列。其过程是,先由 BiLSTM 输出发射矩阵,接下来结合 CRF 层的转移矩阵,使用维特比算法得到最优的标签序列。
class NER(nn.Module):
def __init__(self, vocab_size, label_num):
super(NER, self).__init__()
self.vocab_size = vocab_size
self.label_num = label_num
# 双向长短记忆网络
self.bilstm = BiLSTM(vocab_size=self.vocab_size, label_num=self.label_num)
# 条件随机场网络层
self.crf = CRF(label_num=self.label_num)
def forward(self, inputs, labels, length):
# 计算输入批次样本的每个 Token 的分数,即: 每个句子的发射矩阵
emission_scores = self.bilstm(inputs, length)
# 计算批次样本的总损失
batch_loss = self.crf(emission_scores, labels)
# 返回总损失
return batch_loss
def save_model(self, save_apth):
save_info = {
'init': {'vocab_size': self.vocab_size, 'label_num': self.label_num},
'state': self.state_dict()
}
torch.save(save_info, save_apth)
def predict(self, inputs):
# 计算输入批次样本的每个 Token 的分数,即: 每个句子的发射矩阵
emission_scores = self.bilstm.predict(inputs)
# viterbi_decode 函数接收的发射矩阵为二维的 (seq_len, scores)
logits = self.crf.predict(emission_scores)
return logits

冀公网安备13050302001966号