PaddleNLP 中封装了一些 Encoder,可以帮助我们 sequence 中的多 token 向量转换为 sequence vector,主要方法有:
- BoWEncoder
- CNNEncoder
- GRUEncode、LSTMEncoder、RNNEncoder
- TCNEncoder
1. BoWEncoder
将每个 token 的特征值相加,得到的单个向量作为 sequence vector,实例代码如下:
from paddlenlp.seq2vec import BoWEncoder
import torch
def test():
B, N, D = 2, 3, 4
inputs = torch.randint(5, size=[B, N, D])
print(inputs)
# emb_dim 输入数据的维度
encoder = BoWEncoder(emb_dim=D)
outputs = encoder(inputs)
print(outputs)
if __name__ == '__main__':
test()
程序执行结果:
tensor([[[4, 2, 0, 3],
[1, 1, 2, 3],
[1, 4, 3, 2]],
[[2, 0, 2, 3],
[1, 0, 1, 0],
[3, 2, 1, 3]]])
tensor([[6, 7, 5, 8],
[6, 2, 4, 6]])
2. CNNEncoder
假设输入 3 个字,每个字有 4 个维度的向量表示,如下所示:
[[0., 4., 3., 4.], [0., 2., 1., 1.], [3., 1., 3., 2.]]
CNNEncoder 根据 ngram_filter_sizes 值确定 ngram,比如该值为 (2, 3) 表示分别以 2-gram 和 3-gram 来对输入句子进行卷积计算。
2-gram 时,[0., 4., 3., 4.] 和 [0., 2., 1., 1.] 一起融合得到一个值,[0., 2., 1., 1.] 和 [3., 1., 3., 2.] 融合得到一个值,这个是一个卷积核输出 2 个 值,也可以是多个卷积核。
3-gram 时,[0., 4., 3., 4.] 、[0., 2., 1., 1.] 和 [3., 1., 3., 2.] 融合得到一个值。
卷积核的计算结果可以增加一个 tanh 激活函数,将输出值映射到 -1 到 1 之间。
接下来对 2-gram 的结果进行最大池化操作,得到一个值。最后,把 2-gram 和 3-gram 的值拼接到一起得到 2 维输出向量。如果用户指定了输出维度,则再增加一个线性层,将 2 维的向量映射为用户指定维度。
示例代码:
from paddlenlp.seq2vec import CNNEncoder
import paddle
import paddle.nn as nn
import numpy as np
import paddle.nn.functional as F
class CNNEncoder(nn.Layer):
def __init__(self,
emb_dim,
num_filter,
ngram_filter_sizes=(2, 3),
conv_layer_activation=nn.Tanh(),
output_dim=None,
**kwargs):
super().__init__()
self._emb_dim = emb_dim
self._num_filter = num_filter
self._ngram_filter_sizes = ngram_filter_sizes
self._activation = conv_layer_activation
self._output_dim = output_dim
self.convs = paddle.nn.LayerList([
nn.Conv2D(in_channels=1,
out_channels=self._num_filter,
kernel_size=(i, self._emb_dim),
**kwargs) for i in self._ngram_filter_sizes ])
maxpool_output_dim = self._num_filter * len(self._ngram_filter_sizes)
if self._output_dim:
self.projection_layer = nn.Linear(maxpool_output_dim,
self._output_dim)
else:
self.projection_layer = None
self._output_dim = maxpool_output_dim
def forward(self, inputs, mask=None):
if mask is not None:
inputs = inputs * mask
inputs = inputs.unsqueeze(1)
# 此处去除了 tanh 激活函数,源代码实现中是有的
convs_out = [conv(inputs).squeeze(3) for conv in self.convs]
print('卷积核的输出结果:\n', convs_out)
print('-' * 100)
maxpool_out = [F.adaptive_max_pool1d(t, output_size=1).squeeze(2) for t in convs_out ]
result = paddle.concat(maxpool_out, axis=1)
if self.projection_layer is not None:
result = self.projection_layer(result)
return result
def test():
B, N, D = 2, 3, 4
inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32)
print('测试输入数据:\n', inputs)
# emb_dim 输入数据的维度
# num_filter 表示输出通道数量,相当于多少个卷积核,默认输出维度为 num_filter*2
# ngram_filter_sizes 表示 ngram,可以是多个值,元素的数量决定了卷积核的数量
encoder = CNNEncoder(emb_dim=D, num_filter=3, ngram_filter_sizes=(2, 3))
encoder.convs[0].weight.data = paddle.randint(low=0, high=5, shape=[2, 4])
# 将卷积核参数初始化为简单的值
nn.initializer.Constant(value=1)(encoder.convs[0].weight)
nn.initializer.Constant(value=2)(encoder.convs[1].weight)
print('-' * 100)
print('第一个卷积核参数:\n', encoder.convs[0].weight)
print('-' * 100)
print('第二个卷积核参数:\n', encoder.convs[1].weight)
print('-' * 100)
outputs = encoder(inputs)
print('最终输出结果:\n', outputs)
if __name__ == '__main__':
test()
程序输出结果:
测试输入数据:
Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True,
[[[1., 2., 3., 1.],
[4., 0., 3., 1.],
[3., 2., 2., 2.]],
[[0., 0., 3., 4.],
[1., 4., 4., 0.],
[2., 4., 4., 1.]]])
---------------------------------------------------------------------------------------
第一个卷积核参数:
Parameter containing:
Tensor(shape=[3, 1, 2, 4], dtype=float32, place=Place(cpu), stop_gradient=False,
[[[[1., 1., 1., 1.],
[1., 1., 1., 1.]]],
[[[1., 1., 1., 1.],
[1., 1., 1., 1.]]],
[[[1., 1., 1., 1.],
[1., 1., 1., 1.]]]])
---------------------------------------------------------------------------------------
第二个卷积核参数:
Parameter containing:
Tensor(shape=[3, 1, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=False,
[[[[2., 2., 2., 2.],
[2., 2., 2., 2.],
[2., 2., 2., 2.]]],
[[[2., 2., 2., 2.],
[2., 2., 2., 2.],
[2., 2., 2., 2.]]],
[[[2., 2., 2., 2.],
[2., 2., 2., 2.],
[2., 2., 2., 2.]]]])
---------------------------------------------------------------------------------------
卷积核的输出结果:
[Tensor(shape=[2, 3, 2], dtype=float32, place=Place(cpu), stop_gradient=False,
[[[15., 17.],
[15., 17.],
[15., 17.]],
[[16., 20.],
[16., 20.],
[16., 20.]]]), Tensor(shape=[2, 3, 1], dtype=float32, place=Place(cpu), stop_gradient=False,
[[[48.],
[48.],
[48.]],
[[54.],
[54.],
[54.]]])]
---------------------------------------------------------------------------------------
最终输出结果:
Tensor(shape=[2, 6], dtype=float32, place=Place(cpu), stop_gradient=False,
[[17., 17., 17., 48., 48., 48.],
[20., 20., 20., 54., 54., 54.]])
3. GRUEncode
GRUEncode 其实就是使用 nn.GRU 对输入的 hidden states 计算每个 token 的表示向量。如果 pooling_type 为 None 则返回最后一个时间步的张量作为句子向量。如果 pooling_type 不为空,则其为 “sum”, “max” and “mean” 其中之一,此时就会根据所有时间步的输出,并计算 sum、mean、max 作为最终的输出向量。
示例代码:
from paddlenlp.seq2vec import GRUEncoder
# from paddlenlp.seq2vec import LSTMEncoder
# from paddlenlp.seq2vec import RNNEncoder
# from paddlenlp.seq2vec import TCNEncoder
import paddle
def test():
B, N, D = 2, 3, 4
inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32)
print('测试输入数据:\n', inputs)
encoder = GRUEncoder(input_size=D,
hidden_size=8,
direction='forward',
pooling_type='mean')
outputs = encoder(inputs, sequence_length=paddle.to_tensor([3, 3]))
print('最终输出结果:\n', outputs)
if __name__ == '__main__':
test()
程序输出结果:
测试输入数据:
Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True,
[[[4., 4., 0., 4.],
[3., 3., 4., 1.],
[3., 0., 2., 3.]],
[[0., 4., 0., 4.],
[2., 2., 0., 2.],
[2., 4., 0., 2.]]])
最终输出结果:
Tensor(shape=[2, 8], dtype=float32, place=Place(cpu), stop_gradient=False,
[[-0.22994323, -0.49282956, -0.48349500, -0.46896759, -0.31692216,
-0.25488335, -0.51417428, 0.25784063],
[ 0.11594519, -0.48312032, -0.25954613, -0.54152632, -0.39040449,
-0.00999623, -0.72644186, -0.01175328]])
4. TCNEncoder
通过 TCN 网络对输入的多个 token 向量进行表征,使用最后一个 token 的表征作为句子的表征。它的参数有有:
- input_size 表示输入的 token 维度
- num_channels TCN 是一个多层的网络,每一层都是一个卷积层,该参数指定每层的 out_channels,即:输出的 token 维度,所以该参数是一个列表。最有一层的 out_channels 决定了最终输出的句子向量维度
- kernel_size=2 卷积核的大小
- dropout=0.2 随机丢弃率
示例代码:
from paddlenlp.seq2vec import TCNEncoder
import paddle
def test():
B, N, D = 2, 3, 4
inputs = paddle.randint(low=0, high=5, shape=[B, N, D]).astype(paddle.float32)
print('测试输入数据:\n', inputs)
# input_size: 输入数据 token 维度
# num_channels: 每一层的 out_channels 数量,即: 该层输出 token 维度
# kernel_size: 卷积核的大小
encoder = TCNEncoder(input_size=4, num_channels=[3, 4], kernel_size=2)
outputs = encoder(inputs)
print('最终输出结果:\n', outputs.shape)
print('最终输出结果:\n', outputs)
if __name__ == '__main__':
test()
程序执行结果:
测试输入数据:
Tensor(shape=[2, 3, 4], dtype=float32, place=Place(cpu), stop_gradient=True,
[[[1., 3., 1., 3.],
[0., 2., 2., 4.],
[3., 4., 4., 3.]],
[[0., 3., 3., 2.],
[1., 2., 1., 0.],
[0., 4., 0., 2.]]])
最终输出结果:
[2, 4]
最终输出结果:
Tensor(shape=[2, 4], dtype=float32, place=Place(cpu), stop_gradient=False,
[[0.02269164, 0.40144998, 0.54353219, 0.23521927],
[0.00095350, 0. , 2.25108194, 2.18109298]])

冀公网安备13050302001966号