Transformer 是谷歌在 2017 年的论文 《Attention Is All you Need》中提出的一种新架构,它在多项 NLP 任务上效果很不错,训练速度也比 RNN 更快。目前 Transformer 已经取代了 RNN 成为神经网络机器翻译的 SOTA(State Of The Art)模型,SOTA 模型指的是某个领域内最优秀的模型,在微软、腾讯、阿里等公式得到应用。
Tansformer 架构整体也是使用的是 Encoder Deocder 的架构,只不过编码器和解码器并不是使用的 RNN 模型,而是应用多头自注意力机制重新设计的网络架构。
上图中,左侧为 Encoder,右侧为 Decoder。左侧的编码器是由 6 个 Encoder 组成的,Decoder 也是由 6 个 Decoder 组成的。
由上图可以看到,编码器和解码器都会先将输入进行词嵌入,并且在词嵌入中加入位置编码。然后经过多头自注意力层,不同的是编码器的多头自主力计算时不需要掩码,而解码器的多头自注意力计算是需要添加掩码的。
1. Embedding
import torch import torch.nn as nn import torch.nn.functional as F import math class Embedding(nn.Module): def __init__(self, vocab_size, embed_dim): """ :param vocab_size: 词的数量 :param embed_dim: 词嵌入维度 """ super(Embedding, self).__init__() self.vocab_size = vocab_size self.embed_dim = embed_dim self.embedding = nn.Embedding(self.vocab_size, self.embed_dim) def forward(self, inputs): outputs = self.embedding(inputs) return outputs * math.sqrt(self.embed_dim) def test(): model = Embedding(100, 200) print(model(torch.tensor([[1, 2, 3], [4, 5, 6]]))) if __name__ == '__main__': test()
2. Position Encoding
我们知道序列的顺序不同,则其表示也不同,Transformer 中对位置信息使用单独的编码来实现。即: 图中的先对输入进行编码然后再编码中加入位置编码,这样输入的内容就带有了未知信息。编码主要使用三角函数来进行编码。
import torch import torch.nn as nn import torch.nn.functional as F import math class PositionEncoding(nn.Module): def __init__(self, max_len, d_model, p): """ :param max_len: 序列最大长度 """ super(PositionEncoding, self).__init__() # 初始化对象属性 self.max_len = max_len self.d_model = d_model # 初始化正则化层 self.dropout = nn.Dropout(p=p) # 初始化位置编码 pe = torch.zeros(max_len, d_model) # 初始化位置: 从 0 开始共 max_len 行 1 列 position = torch.arange(0, max_len).reshape(-1, 1) div_term = torch.exp(torch.arange(0, self.d_model, 2) * (math.log(10000.0) / self.d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe.unsqueeze_(0) self.register_buffer('pe', pe) # inputs 形状: [batch_size, seq_len, input_size] def forward(self, inputs): # 如果一个句子有5个词,则从pe中取出前5个词的位置编码,加上原来张量上 # 表示原来的序列也具有了位置信息 outputs = inputs + self.pe[:, :inputs.size(1)].clone().detach() return self.dropout(outputs) def test(): model = PositionEncoding(max_len=4, d_model=4, p=0.2) inputs = torch.randn(1, 2, 4) print(inputs) inputs = model(inputs) print(inputs) if __name__ == '__main__': test()
3. Mask Matrix
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import math def subsequent_mask(size): attn_shape = (1, size, size) subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8') return torch.from_numpy(subsequent_mask) == 0 def test01(): mask = subsequent_mask(5) print(mask) def test02(): print(np.triu(np.ones([5, 5]))) print('-' * 20) print(np.triu(np.ones([5, 5]), k=1)) print('-' * 20) print(np.triu(np.ones([5, 5]), k=2)) print('-' * 20) print(np.triu(np.ones([5, 5]), k=-1)) print('-' * 20) print(np.triu(np.ones([5, 5]), k=-2)) if __name__ == '__main__': test01() test02()
test01 函数程序运行结果如下:
tensor([[[ True, False, False, False, False], [ True, True, False, False, False], [ True, True, True, False, False], [ True, True, True, True, False], [ True, True, True, True, True]]])
test02 函数的运行结果如下:
[[1. 1. 1. 1. 1.] [0. 1. 1. 1. 1.] [0. 0. 1. 1. 1.] [0. 0. 0. 1. 1.] [0. 0. 0. 0. 1.]] -------------------- [[0. 1. 1. 1. 1.] [0. 0. 1. 1. 1.] [0. 0. 0. 1. 1.] [0. 0. 0. 0. 1.] [0. 0. 0. 0. 0.]] -------------------- [[0. 0. 1. 1. 1.] [0. 0. 0. 1. 1.] [0. 0. 0. 0. 1.] [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.]] -------------------- [[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [0. 1. 1. 1. 1.] [0. 0. 1. 1. 1.] [0. 0. 0. 1. 1.]] -------------------- [[1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [1. 1. 1. 1. 1.] [0. 1. 1. 1. 1.] [0. 0. 1. 1. 1.]]
np.triu 函数中的 k 值影响到对角线的位置。如果 k 是大于 0 的 1 的数,则对角线向上移动。如果 k 是小于 0 的值,则对角线向下移动。
3. MultiHeadAttention
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import math # 输入形状: [batch_size. seq_len, input_size] def attention(query, key, value, mask=None, dropout=None): # 获取查询张量 input_size 大小 d_k = query.size(-1) # 注意力得分除以缩放系数 scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) # 判断是否有掩码张量 if mask is not None: scores = scores.masked_fill(mask==0, 1e-9) p_attn = F.softmax(scores, dim=-1) if dropout is not None: p_attn = dropout(p_attn) return torch.matmul(p_attn, value), p_attn def test(): # 假设, 我们输入一句话: 我 爱 你 # 每个字使用3维张量表示 torch.manual_seed(0) # 我的表示 q = torch.randn(3,).float() # 我爱你每个字的表示 torch.manual_seed(0) k = torch.randn((4, 3)).float() # k=v torch.manual_seed(0) v = torch.randn((4, 3)).float() print('q:\n', q) print('k:\n', k) print('v:\n', v) att_tensor, attn = attention(q, k, v) print('注意力权重:', attn) print('注意力张量表示:', att_tensor) if __name__ == '__main__': test()
程序输出结果:
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import math import copy # 输入形状: [batch_size. seq_len, input_size] def attention(query, key, value, mask=None, dropout=None): # 获取查询张量 input_size 大小 d_k = query.size(-1) # 注意力得分除以缩放系数 scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) # 判断是否有掩码张量 if mask is not None: scores = scores.masked_fill(mask==0, 1e-9) p_attn = F.softmax(scores, dim=-1) if dropout is not None: p_attn = dropout(p_attn) return torch.matmul(p_attn, value), p_attn def test01(): # 假设, 我们输入一句话: 我 爱 你 # 每个字使用3维张量表示 torch.manual_seed(0) # 我的表示 q = torch.randn(3,).float() # 我爱你每个字的表示 torch.manual_seed(0) k = torch.randn((4, 3)).float() # k=v torch.manual_seed(0) v = torch.randn((4, 3)).float() print('q:\n', q) print('k:\n', k) print('v:\n', v) att_tensor, attn = attention(q, k, v) print('注意力权重:', attn) print('注意力张量表示:', att_tensor) def clone(module, N): return nn.ModuleList([copy.deepcopy(module) for _ in range(N)]) class MultiHeadAttention(nn.Module): def __init__(self, head, embedding_dim, p=0.1): super(MultiHeadAttention, self).__init__() # 计算每个注意力头的维度 self.d_k = embedding_dim // head # 注意力头的数量 self.head = head # 创建多个线性层 self.q_linear = nn.Linear(embedding_dim, embedding_dim) self.k_linear = nn.Linear(embedding_dim, embedding_dim) self.v_linear = nn.Linear(embedding_dim, embedding_dim) self.linear = nn.Linear(embedding_dim, embedding_dim) # 初始化正则化层 self.dropout = nn.Dropout(p=p) def forward(self, query, key, value, mask=None): # 我们输入的数据维度是: [batch_size, 二维的注意力初始值] batch_size = query.size(0) # 根据输入计算注意力张量 query = self.q_linear(query).view(batch_size, -1, self.head, self.d_k) key = self.k_linear(key).view(batch_size, -1, self.head, self.d_k) value = self.v_linear(value).view(batch_size, -1, self.head, self.d_k) print('q:\n', query.shape) if mask is not None: mask = mask.unsuqeeze(0) att_tensor, attn = attention(query, key, value, mask=mask, dropout=self.dropout) print('att_tensor:', att_tensor.shape) print('attn:', attn.shape) att_tensor = att_tensor.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k) return self.linear(att_tensor) def test02(): torch.manual_seed(0) # (4, 3)表示一个4个字,每个字使用3个维度表示 q = torch.randn(4, 3).float() # (4, 5, 3)表批次共有4个句子, 每个句子的长度为5, 每个字的使用3维度表示 torch.manual_seed(0) k = torch.randn((4, 5, 3)).float() # (4, 5, 3)表批次共有4个句子, 每个句子的长度为5, 每个字的使用3维度表示 torch.manual_seed(0) v = torch.randn((4, 5, 3)).float() print('q:\n', q) print('k:\n', k) print('v:\n', v) multi_head = MultiHeadAttention(3, embedding_dim=3) attn_tensor = multi_head(q, k, v) print(attn_tensor) if __name__ == '__main__': test02()
4. FeedForward
import torch import torch.nn as nn import torch.nn.functional as F class PositionWiseFeedForward(nn.Module): def __init__(self, d_model, d_ff, p=0.1): super(PositionWiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) self.dropout = nn.Dropout(p=p) def forward(self, inputs): inputs = self.linear1(inputs) inputs = F.relu(inputs) inputs = self.dropout(inputs) outputs = self.linear2(inputs) return outputs
5. Batch Normalization
import torch import torch.nn as nn import torch.nn.functional as F class Normalization(nn.Module): def __init__(self, input_size, eps=1e-6): super(Normalization, self).__init__() # gama 权重参数 self.gamma = nn.Parameter(torch.ones(input_size)) # beta 偏置参数 self.beta = nn.Parameter(torch.zeros(input_size)) self.eps = eps def forward(self, inputs): mean = inputs.mean(-1, keepdim=True) std = inputs.std(-1, keepdim=True) return self.gamma * (inputs - mean) / (std + self.eps) + self.beta def test(): torch.manual_seed(0) inputs = torch.randint(0, 10, size=[3, 5, 8]).float() norm = Normalization(input_size=8) outputs = norm(inputs) print(outputs) if __name__ == '__main__': test()
6. SubLayerConnection
import torch import torch.nn as nn import torch.nn.functional as F import math class Normalization(nn.Module): def __init__(self, input_size, eps=1e-6): super(Normalization, self).__init__() # gama 权重参数 self.gamma = nn.Parameter(torch.ones(input_size)) # beta 偏置参数 self.beta = nn.Parameter(torch.zeros(input_size)) self.eps = eps def forward(self, inputs): mean = inputs.mean(-1, keepdim=True) std = inputs.std(-1, keepdim=True) return self.gamma * (inputs - mean) / (std + self.eps) + self.beta class SubLayerConnection(nn.Module): def __init__(self, input_size, p=0.1): super(SubLayerConnection, self).__init__() self.norm = Normalization(input_size) self.dropout = nn.Dropout(p=p) def forward(self, inputs, sublayer): # sublayer 可能是全连接层,也可能是多头注意力层 outputs = self.dropout(sublayer(self.norm(inputs))) return inputs + outputs def test(): pass if __name__ == '__main__': test()
7. Encoder Layer
import torch import torch.nn as nn import torch.nn.functional as F class EncoderLayer(nn.Module): def __init__(self, input_size, self_attn, feedward, p): super(EncoderLayer, self).__init__() # 保存两个层对象 self.self_attn = self_attn self.feedward = feedward # 连接子层 self.sublayer1 = SubLayerConnection(input_size, p) self.sublayer2 = SubLayerConnection(input_size, p) # 输入词嵌入维度 self.input_size = input_size def forward(self, inputs, mask): inputs = self.sublayer1(inputs, lambda x: self.self_attn(x, x, x, mask)) output = self.sublayer2(inputs, self.feedward) return output if __name__ == '__main__': pass