PaddleNLP 中封装了一些 Encoder,可以帮助我们 sequence 中的多 token 向量转换为 sequence vector,主要方法有:
- BoWEncoder
- CNNEncoder
- GRUEncode、LSTMEncoder、RNNEncoder
- TCNEncoder
1. BoWEncoder
将每个 token 的特征值相加,得到的单个向量作为 sequence vector,实例代码如下:
from paddlenlp.seq2vec import BoWEncoder import torch def test(): B, N, D = 2, 3, 4 inputs = torch.randint(5, size=[B, N, D]) print(inputs) # emb_dim 输入数据的维度 encoder = BoWEncoder(emb_dim=D) outputs = encoder(inputs) print(outputs) if __name__ == '__main__': test()
程序执行结果:
tensor([[[4, 2, 0, 3], [1, 1, 2, 3], [1, 4, 3, 2]], [[2, 0, 2, 3], [1, 0, 1, 0], [3, 2, 1, 3]]]) tensor([[6, 7, 5, 8], [6, 2, 4, 6]])
2. CNNEncoder
假设输入 3 个字,每个字有 4 个维度的向量表示,如下所示:
[[0., 4., 3., 4.], [0., 2., 1., 1.], [3., 1., 3., 2.]]
CNNEncoder 根据 ngram_filter_sizes 值确定 ngram,比如该值为 (2, 3) 表示分别以 2-gram 和 3-gram 来对输入句子进行卷积计算。
2-gram 时,[0., 4., 3., 4.] 和 [0., 2., 1., 1.] 一起融合得到一个值,[0., 2., 1., 1.] 和 [3., 1., 3., 2.] 融合得到一个值,这个是一个卷积核输出 2 个 值,也可以是多个卷积核。
3-gram 时,[0., 4., 3., 4.] 、[0., 2., 1., 1.] 和 [3., 1., 3., 2.] 融合得到一个值。
卷积核的计算结果可以增加一个 tanh 激活函数,将输出值映射到 -1 到 1 之间。
接下来对 2-gram 的结果进行最大池化操作,得到一个值。最后,把 2-gram 和 3-gram 的值拼接到一起得到 2 维输出向量。如果用户指定了输出维度,则再增加一个线性层,将 2 维的向量映射为用户指定维度。
示例代码:
from paddlenlp.seq2vec import CNNEncoder import paddle import paddle.nn as nn import numpy as np import paddle.nn.functional as F class CNNEncoder(nn.Layer): def __init__(self, emb_dim, num_filter, ngram_filter_sizes=(2, 3), conv_layer_activation=nn.Tanh(), output_dim=None, **kwargs): super().__init__() self._emb_dim = emb_dim self._num_filter = num_filter self._ngram_filter_sizes = ngram_filter_sizes self._activation = conv_layer_activation self._output_dim = output_dim self.convs = paddle.nn.LayerList([ nn.Conv2D(in_channels=1, out_channels=self._num_filter, kernel_size=(i, self._emb_dim), **kwargs) for i in self._ngram_filter_sizes ]) maxpool_output_dim = self._num_filter * len(self._ngram_filter_sizes) if self._output_dim: self.projection_layer = nn.Linear(maxpool_output_dim, self._output_dim) else: self.projection_layer = None self._output_dim = maxpool_output_dim def forward(self, inputs, mask=None): if mask is not None: inputs = inputs * mask inputs = inputs.unsqueeze(1) # 此处去除了 tanh 激活函数,源代码实现中是有的 convs_out = [conv(inputs).squeeze(3) for conv in self.convs] print('卷积核的输出结果:\n', convs_out) print('-' * 100) maxpool_out = [F.adaptive_max_pool1d(t, output_size=1).squeeze(2) for t in convs_out ] result = paddle.concat(maxpool_out, axis=1) if self.projection_layer is not None: result = self.projection_layer(result) return result def test(): B, N, D = 2, 3, 4 inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32) print('测试输入数据:\n', inputs) # emb_dim 输入数据的维度 # num_filter 表示输出通道数量,相当于多少个卷积核,默认输出维度为 num_filter*2 # ngram_filter_sizes 表示 ngram,可以是多个值,元素的数量决定了卷积核的数量 encoder = CNNEncoder(emb_dim=D, num_filter=3, ngram_filter_sizes=(2, 3)) encoder.convs[0].weight.data = paddle.randint(low=0, high=5, shape=[2, 4]) # 将卷积核参数初始化为简单的值 nn.initializer.Constant(value=1)(encoder.convs[0].weight) nn.initializer.Constant(value=2)(encoder.convs[1].weight) print('-' * 100) print('第一个卷积核参数:\n', encoder.convs[0].weight) print('-' * 100) print('第二个卷积核参数:\n', encoder.convs[1].weight) print('-' * 100) outputs = encoder(inputs) print('最终输出结果:\n', outputs) if __name__ == '__main__': test()
程序输出结果:
测试输入数据: Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True, [[[1., 2., 3., 1.], [4., 0., 3., 1.], [3., 2., 2., 2.]], [[0., 0., 3., 4.], [1., 4., 4., 0.], [2., 4., 4., 1.]]]) --------------------------------------------------------------------------------------- 第一个卷积核参数: Parameter containing: Tensor(shape=[3, 1, 2, 4], dtype=float32, place=Place(cpu), stop_gradient=False, [[[[1., 1., 1., 1.], [1., 1., 1., 1.]]], [[[1., 1., 1., 1.], [1., 1., 1., 1.]]], [[[1., 1., 1., 1.], [1., 1., 1., 1.]]]]) --------------------------------------------------------------------------------------- 第二个卷积核参数: Parameter containing: Tensor(shape=[3, 1, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=False, [[[[2., 2., 2., 2.], [2., 2., 2., 2.], [2., 2., 2., 2.]]], [[[2., 2., 2., 2.], [2., 2., 2., 2.], [2., 2., 2., 2.]]], [[[2., 2., 2., 2.], [2., 2., 2., 2.], [2., 2., 2., 2.]]]]) --------------------------------------------------------------------------------------- 卷积核的输出结果: [Tensor(shape=[2, 3, 2], dtype=float32, place=Place(cpu), stop_gradient=False, [[[15., 17.], [15., 17.], [15., 17.]], [[16., 20.], [16., 20.], [16., 20.]]]), Tensor(shape=[2, 3, 1], dtype=float32, place=Place(cpu), stop_gradient=False, [[[48.], [48.], [48.]], [[54.], [54.], [54.]]])] --------------------------------------------------------------------------------------- 最终输出结果: Tensor(shape=[2, 6], dtype=float32, place=Place(cpu), stop_gradient=False, [[17., 17., 17., 48., 48., 48.], [20., 20., 20., 54., 54., 54.]])
3. GRUEncode
GRUEncode 其实就是使用 nn.GRU 对输入的 hidden states 计算每个 token 的表示向量。如果 pooling_type 为 None 则返回最后一个时间步的张量作为句子向量。如果 pooling_type 不为空,则其为 “sum”, “max” and “mean” 其中之一,此时就会根据所有时间步的输出,并计算 sum、mean、max 作为最终的输出向量。
示例代码:
from paddlenlp.seq2vec import GRUEncoder # from paddlenlp.seq2vec import LSTMEncoder # from paddlenlp.seq2vec import RNNEncoder # from paddlenlp.seq2vec import TCNEncoder import paddle def test(): B, N, D = 2, 3, 4 inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32) print('测试输入数据:\n', inputs) encoder = GRUEncoder(input_size=D, hidden_size=8, direction='forward', pooling_type='mean') outputs = encoder(inputs, sequence_length=paddle.to_tensor([3, 3])) print('最终输出结果:\n', outputs) if __name__ == '__main__': test()
程序输出结果:
测试输入数据: Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True, [[[4., 4., 0., 4.], [3., 3., 4., 1.], [3., 0., 2., 3.]], [[0., 4., 0., 4.], [2., 2., 0., 2.], [2., 4., 0., 2.]]]) 最终输出结果: Tensor(shape=[2, 8], dtype=float32, place=Place(cpu), stop_gradient=False, [[-0.22994323, -0.49282956, -0.48349500, -0.46896759, -0.31692216, -0.25488335, -0.51417428, 0.25784063], [ 0.11594519, -0.48312032, -0.25954613, -0.54152632, -0.39040449, -0.00999623, -0.72644186, -0.01175328]])
4. TCNEncoder
通过 TCN 网络对输入的多个 token 向量进行表征,使用最后一个 token 的表征作为句子的表征。它的参数有有:
- input_size 表示输入的 token 维度
- num_channels TCN 是一个多层的网络,每一层都是一个卷积层,该参数指定每层的 out_channels,即:输出的 token 维度,所以该参数是一个列表。最有一层的 out_channels 决定了最终输出的句子向量维度
- kernel_size=2 卷积核的大小
- dropout=0.2 随机丢弃率
示例代码:
from paddlenlp.seq2vec import TCNEncoder import paddle def test(): B, N, D = 2, 3, 4 inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32) print('测试输入数据:\n', inputs) # input_size: 输入数据 token 维度 # num_channels: 每一层的 out_channels 数量,即: 该层输出 token 维度 # kernel_size: 卷积核的大小 encoder = TCNEncoder(input_size=4, num_channels=[3, 4], kernel_size=2) outputs = encoder(inputs) print('最终输出结果:\n', outputs.shape) print('最终输出结果:\n', outputs) if __name__ == '__main__': test()
程序执行结果:
测试输入数据: Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True, [[[1., 3., 1., 3.], [0., 2., 2., 4.], [3., 4., 4., 3.]], [[0., 3., 3., 2.], [1., 2., 1., 0.], [0., 4., 0., 2.]]]) 最终输出结果: [2, 4] 最终输出结果: Tensor(shape=[2, 4], dtype=float32, place=Place(cpu), stop_gradient=False, [[0.02269164, 0.40144998, 0.54353219, 0.23521927], [0.00095350, 0. , 2.25108194, 2.18109298]])