Transformer 词嵌入+位置编码

Transformer 是谷歌在 2017 年的论文 《Attention Is All you Need》中提出的一种新架构,它在多项 NLP 任务上效果很不错,训练速度也比 RNN 更快。目前 Transformer 已经取代了 RNN 成为神经网络机器翻译的 SOTA(State Of The Art)模型,SOTA 模型指的是某个领域内最优秀的模型,在微软、腾讯、阿里等公式得到应用。

Tansformer 架构整体也是使用的是 Encoder Deocder 的架构,只不过编码器和解码器并不是使用的 RNN 模型,而是应用多头自注意力机制重新设计的网络架构。

上图中,左侧为 Encoder,右侧为 Decoder。左侧的编码器是由 6 个 Encoder 组成的,Decoder 也是由 6 个 Decoder 组成的。

由上图可以看到,编码器和解码器都会先将输入进行词嵌入,并且在词嵌入中加入位置编码。然后经过多头自注意力层,不同的是编码器的多头自主力计算时不需要掩码,而解码器的多头自注意力计算是需要添加掩码的。

1. Embedding

import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class Embedding(nn.Module):

    def __init__(self, vocab_size, embed_dim):
        """
        :param vocab_size: 词的数量
        :param embed_dim: 词嵌入维度
        """
        super(Embedding, self).__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.embedding = nn.Embedding(self.vocab_size, self.embed_dim)

    def forward(self, inputs):
        outputs = self.embedding(inputs)
        return outputs * math.sqrt(self.embed_dim)


def test():
    model = Embedding(100, 200)
    print(model(torch.tensor([[1, 2, 3], [4, 5, 6]])))


if __name__ == '__main__':
    test()

2. Position Encoding

我们知道序列的顺序不同,则其表示也不同,Transformer 中对位置信息使用单独的编码来实现。即: 图中的先对输入进行编码然后再编码中加入位置编码,这样输入的内容就带有了未知信息。编码主要使用三角函数来进行编码。

import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class PositionEncoding(nn.Module):

    def __init__(self, max_len, d_model, p):
        """
        :param max_len: 序列最大长度
        """
        super(PositionEncoding, self).__init__()
        # 初始化对象属性
        self.max_len = max_len
        self.d_model = d_model
        # 初始化正则化层
        self.dropout = nn.Dropout(p=p)
        # 初始化位置编码
        pe = torch.zeros(max_len, d_model)
        # 初始化位置: 从 0 开始共 max_len 行 1 列
        position = torch.arange(0, max_len).reshape(-1, 1)

        div_term = torch.exp(torch.arange(0, self.d_model, 2) * (math.log(10000.0) / self.d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe.unsqueeze_(0)

        self.register_buffer('pe', pe)

    # inputs 形状: [batch_size, seq_len, input_size]
    def forward(self, inputs):

        # 如果一个句子有5个词,则从pe中取出前5个词的位置编码,加上原来张量上
        # 表示原来的序列也具有了位置信息
        outputs = inputs + self.pe[:, :inputs.size(1)].clone().detach()
        return self.dropout(outputs)


def test():
    model = PositionEncoding(max_len=4, d_model=4, p=0.2)
    inputs = torch.randn(1, 2, 4)
    print(inputs)
    inputs = model(inputs)
    print(inputs)


if __name__ == '__main__':
    test()

3. Mask Matrix

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


def subsequent_mask(size):
    attn_shape = (1, size, size)
    subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
    return torch.from_numpy(subsequent_mask) == 0


def test01():
    mask = subsequent_mask(5)
    print(mask)


def test02():
    print(np.triu(np.ones([5, 5])))
    print('-' * 20)
    print(np.triu(np.ones([5, 5]), k=1))
    print('-' * 20)
    print(np.triu(np.ones([5, 5]), k=2))
    print('-' * 20)
    print(np.triu(np.ones([5, 5]), k=-1))
    print('-' * 20)
    print(np.triu(np.ones([5, 5]), k=-2))


if __name__ == '__main__':
    test01()
    test02()

test01 函数程序运行结果如下:

tensor([[[ True, False, False, False, False],
         [ True,  True, False, False, False],
         [ True,  True,  True, False, False],
         [ True,  True,  True,  True, False],
         [ True,  True,  True,  True,  True]]])

test02 函数的运行结果如下:

[[1. 1. 1. 1. 1.]
 [0. 1. 1. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 0. 0. 1.]]
--------------------
[[0. 1. 1. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0.]]
--------------------
[[0. 0. 1. 1. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]]
--------------------
[[1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [0. 1. 1. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 0. 0. 1. 1.]]
--------------------
[[1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1.]
 [0. 1. 1. 1. 1.]
 [0. 0. 1. 1. 1.]]

np.triu 函数中的 k 值影响到对角线的位置。如果 k 是大于 0 的 1 的数,则对角线向上移动。如果 k 是小于 0 的值,则对角线向下移动。

3. MultiHeadAttention

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


# 输入形状: [batch_size. seq_len, input_size]
def attention(query, key, value, mask=None, dropout=None):

    # 获取查询张量 input_size 大小
    d_k = query.size(-1)
    # 注意力得分除以缩放系数
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    # 判断是否有掩码张量
    if mask is not None:
        scores = scores.masked_fill(mask==0, 1e-9)

    p_attn = F.softmax(scores, dim=-1)

    if dropout is not None:
        p_attn = dropout(p_attn)

    return torch.matmul(p_attn, value), p_attn


def test():

    # 假设, 我们输入一句话: 我 爱 你
    # 每个字使用3维张量表示

    torch.manual_seed(0)
    # 我的表示
    q = torch.randn(3,).float()

    # 我爱你每个字的表示
    torch.manual_seed(0)
    k = torch.randn((4, 3)).float()

    # k=v
    torch.manual_seed(0)
    v = torch.randn((4, 3)).float()

    print('q:\n', q)
    print('k:\n', k)
    print('v:\n', v)

    att_tensor, attn = attention(q, k, v)
    print('注意力权重:', attn)
    print('注意力张量表示:', att_tensor)


if __name__ == '__main__':
    test()

程序输出结果:

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import copy


# 输入形状: [batch_size. seq_len, input_size]
def attention(query, key, value, mask=None, dropout=None):

    # 获取查询张量 input_size 大小
    d_k = query.size(-1)
    # 注意力得分除以缩放系数
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    # 判断是否有掩码张量
    if mask is not None:
        scores = scores.masked_fill(mask==0, 1e-9)

    p_attn = F.softmax(scores, dim=-1)

    if dropout is not None:
        p_attn = dropout(p_attn)

    return torch.matmul(p_attn, value), p_attn


def test01():

    # 假设, 我们输入一句话: 我 爱 你
    # 每个字使用3维张量表示

    torch.manual_seed(0)
    # 我的表示
    q = torch.randn(3,).float()

    # 我爱你每个字的表示
    torch.manual_seed(0)
    k = torch.randn((4, 3)).float()

    # k=v
    torch.manual_seed(0)
    v = torch.randn((4, 3)).float()

    print('q:\n', q)
    print('k:\n', k)
    print('v:\n', v)

    att_tensor, attn = attention(q, k, v)
    print('注意力权重:', attn)
    print('注意力张量表示:', att_tensor)


def clone(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


class MultiHeadAttention(nn.Module):

    def __init__(self, head, embedding_dim, p=0.1):
        super(MultiHeadAttention, self).__init__()
        # 计算每个注意力头的维度
        self.d_k = embedding_dim // head
        # 注意力头的数量
        self.head = head
        # 创建多个线性层
        self.q_linear = nn.Linear(embedding_dim, embedding_dim)
        self.k_linear = nn.Linear(embedding_dim, embedding_dim)
        self.v_linear = nn.Linear(embedding_dim, embedding_dim)
        self.linear = nn.Linear(embedding_dim, embedding_dim)
        # 初始化正则化层
        self.dropout = nn.Dropout(p=p)

    def forward(self, query, key, value, mask=None):

        # 我们输入的数据维度是: [batch_size, 二维的注意力初始值]
        batch_size = query.size(0)

        # 根据输入计算注意力张量
        query = self.q_linear(query).view(batch_size, -1, self.head, self.d_k)
        key   = self.k_linear(key).view(batch_size, -1, self.head, self.d_k)
        value = self.v_linear(value).view(batch_size, -1, self.head, self.d_k)

        print('q:\n', query.shape)

        if mask is not None:
            mask = mask.unsuqeeze(0)

        att_tensor, attn = attention(query, key, value, mask=mask, dropout=self.dropout)

        print('att_tensor:', att_tensor.shape)
        print('attn:', attn.shape)

        att_tensor = att_tensor.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)

        return self.linear(att_tensor)


def test02():


    torch.manual_seed(0)
    # (4, 3)表示一个4个字,每个字使用3个维度表示
    q = torch.randn(4, 3).float()

    # (4, 5, 3)表批次共有4个句子, 每个句子的长度为5, 每个字的使用3维度表示
    torch.manual_seed(0)
    k = torch.randn((4, 5, 3)).float()

    # (4, 5, 3)表批次共有4个句子, 每个句子的长度为5, 每个字的使用3维度表示
    torch.manual_seed(0)
    v = torch.randn((4, 5, 3)).float()

    print('q:\n', q)
    print('k:\n', k)
    print('v:\n', v)

    multi_head = MultiHeadAttention(3, embedding_dim=3)
    attn_tensor = multi_head(q, k, v)
    print(attn_tensor)


if __name__ == '__main__':
    test02()

4. FeedForward

import torch
import torch.nn as nn
import torch.nn.functional as F

class PositionWiseFeedForward(nn.Module):

    def __init__(self, d_model, d_ff, p=0.1):
        super(PositionWiseFeedForward, self).__init__()

        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(p=p)

    def forward(self, inputs):

        inputs = self.linear1(inputs)
        inputs = F.relu(inputs)
        inputs = self.dropout(inputs)
        outputs = self.linear2(inputs)

        return outputs

5. Batch Normalization

import torch
import torch.nn as nn
import torch.nn.functional as F

class Normalization(nn.Module):

    def __init__(self, input_size, eps=1e-6):
        super(Normalization, self).__init__()

        # gama 权重参数
        self.gamma = nn.Parameter(torch.ones(input_size))
        # beta 偏置参数
        self.beta = nn.Parameter(torch.zeros(input_size))
        self.eps = eps

    def forward(self, inputs):

        mean = inputs.mean(-1, keepdim=True)
        std = inputs.std(-1, keepdim=True)

        return self.gamma * (inputs - mean) / (std + self.eps) + self.beta


def test():
    torch.manual_seed(0)
    inputs = torch.randint(0, 10, size=[3, 5, 8]).float()

    norm = Normalization(input_size=8)
    outputs = norm(inputs)
    print(outputs)


if __name__ == '__main__':
    test()

6. SubLayerConnection

import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class Normalization(nn.Module):

    def __init__(self, input_size, eps=1e-6):
        super(Normalization, self).__init__()

        # gama 权重参数
        self.gamma = nn.Parameter(torch.ones(input_size))
        # beta 偏置参数
        self.beta = nn.Parameter(torch.zeros(input_size))
        self.eps = eps

    def forward(self, inputs):

        mean = inputs.mean(-1, keepdim=True)
        std = inputs.std(-1, keepdim=True)

        return self.gamma * (inputs - mean) / (std + self.eps) + self.beta


class SubLayerConnection(nn.Module):

    def __init__(self, input_size, p=0.1):
        super(SubLayerConnection, self).__init__()

        self.norm = Normalization(input_size)
        self.dropout = nn.Dropout(p=p)

    def forward(self, inputs, sublayer):
        # sublayer 可能是全连接层,也可能是多头注意力层
        outputs = self.dropout(sublayer(self.norm(inputs)))
        return inputs + outputs


def test():
    pass

if __name__ == '__main__':
    test()

7. Encoder Layer

import torch
import torch.nn as nn
import torch.nn.functional as F


class EncoderLayer(nn.Module):

    def __init__(self, input_size, self_attn, feedward, p):
        super(EncoderLayer, self).__init__()

        # 保存两个层对象
        self.self_attn = self_attn
        self.feedward = feedward

        # 连接子层
        self.sublayer1 = SubLayerConnection(input_size, p)
        self.sublayer2 = SubLayerConnection(input_size, p)

        # 输入词嵌入维度
        self.input_size = input_size

    def forward(self, inputs, mask):

        inputs = self.sublayer1(inputs, lambda x: self.self_attn(x, x, x, mask))
        output = self.sublayer2(inputs, self.feedward)
        return output


if __name__ == '__main__':
    pass
未经允许不得转载:一亩三分地 » Transformer 词嵌入+位置编码