基于 RNN 循环网络实现文本生成

实现的步骤如下:

  1. 构建词典
  2. 构建数据对象
  3. 构建文本生成模型
  4. 编写训练函数
  5. 编写预测函数
import torch
import torch.nn as nn
import jieba
from torch.utils.data import DataLoader
import torch.optim as optim
import time
import torch.nn.functional as F

1. 构建词典

我们在进行文本生成任务之前,首先就要构建词典,通过词典实现将词到索引的映射,或者通过索引到词的映射。我们构建词典使用的训练数据是周杰伦的歌词,文本内容如下:

想要有直升机
想要和你飞到宇宙去
想要和你融化在一起
融化在宇宙里
我每天每天每天在想想想想著你
这样的甜蜜
让我开始相信命运
感谢地心引力
让我碰到你
漂亮的让我面红的可爱女人
温柔的让我心疼的可爱女人
透明的让我感动的可爱女人
坏坏的让我疯狂的可爱女人
坏坏的让我疯狂的可爱女人
漂亮的让我面红的可爱女人
温柔的让我心疼的可爱女人
透明的让我感动的可爱女人
坏坏的让我疯狂的可爱女人
坏坏的让我疯狂的可爱女人
......

共 5819 行歌词,接下来,我们就基于上面的语料来构建词典,我们最终会产生以下 4 个返回值:

  1. index_to_word
  2. word_to_index

示例代码如下:

def build_vocab():

    file_name = 'data/jaychou_lyrics.txt'


    # 1. 清洗文本
    clean_sentences = []
    for line in open(file_name, 'r'):

        line = line.replace('〖韩语Rap译文〗','')
        # 去除中文、英文、数字、部分标点符号外的其他字符
        line = re.sub(r'[^\u4e00-\u9fa5 a-zA-Z0-9!?,]', '', line)
        # 连续空格替换成1个
        line = re.sub(r'[ ]{2,}', '', line)
        # 去除两侧空格、换行
        line = line.strip()
        # 去除单字的行
        if len(line) <= 1:
            continue

        # 去除重复行
        if line not in clean_sentences:
            clean_sentences.append(line)

    # 2. 预料分词
    index_to_word, all_sentences = [], []

    for line in clean_sentences:
        words = jieba.lcut(line)
        all_sentences.append(words)
        for word in words:
            if word not in index_to_word:
                index_to_word.append(word)

    # 词到索引映射
    word_to_index = {word: idx for idx, word in enumerate(index_to_word)}
    # 词的数量
    word_count = len(index_to_word)
    # 句子索引表示
    corpus_idx = []
    for sentence in all_sentences:
        temp = []
        for word in sentence:
            temp.append(word_to_index[word])
        # 在每行歌词之间添加空格隔开
        temp.append(word_to_index[' '])
        corpus_idx.extend(temp)


    return index_to_word, word_to_index, word_count, corpus_idx


def test01():

    index_to_word, word_to_index, word_count, corpus_idx = build_vocab()
    print(word_count)
    print(index_to_word)
    print(word_to_index)
    print(corpus_idx)

程序的输出结果:

5682
['想要', '有', '直升机', '和', '你', '飞到' ......]
{'想要': 0, '有': 1, '直升机': 2, '和': 3, '你': 4, '飞到': 5 ......}
[0, 1, 2, 39, 0, 3, 4, 5, 6, 7, 39, 0, 3, 4, 8, 9, 10, 39, 8, 9, 6, 11, 39 ......]

2. 构建数据对象

我们可以编写 LyricsDataset 类,该类用于产生一条样本,该类需要实现 __init__、__len__、 __getitem__ 方法,示例代码如下:

class LyricsDataset:

    def __init__(self, corpus_idx, num_chars):
        # 语料数据
        self.corpus_idx = corpus_idx
        # 语料长度
        self.num_chars = num_chars
        # 词的数量
        self.word_count = len(self.corpus_idx)
        # 句子数量
        self.number =  self.word_count // self.num_chars


    def __len__(self):
        return self.number

    def __getitem__(self, idx):

        # 修正索引值到: [0, self.word_count - 1]
        start = min(max(idx, 0),  self.word_count - self.num_chars - 2)

        x = self.corpus_idx[start: start + self.num_chars]
        y = self.corpus_idx[start + 1: start + 1 + self.num_chars]

        return torch.tensor(x), torch.tensor(y)


def test02():

    _, _, _, corpus_idx = build_vocab()
    lyrics = LyricsDataset(corpus_idx, 5)
    lyrics_dataloader = DataLoader(lyrics, shuffle=False, batch_size=1)

    for x, y in lyrics_dataloader:
        print('x:', x)
        print('y:', y)
        break

程序结果如下:

x: tensor([[ 0,  1,  2, 39,  0]])
y: tensor([[ 1,  2, 39,  0,  3]])

3. 构建文本生成模型

class TextGenerator(nn.Module):

    def __init__(self, vocab_size):
        super(TextGenerator, self).__init__()
        # 初始化词嵌入层
        self.ebd = nn.Embedding(vocab_size, 128)
        # 循环网络层
        self.rnn = nn.RNN(128, 128, 1)
        # 输出层
        self.out = nn.Linear(128, vocab_size)

    def forward(self, inputs, hidden):

        # 输出维度: (1, 5, 128)
        embed = self.ebd(inputs)

        # 正则化层
        embed = F.dropout(embed, p=0.2)

        # 修改维度: (5, 1, 128)
        output, hidden = self.rnn(embed.transpose(0, 1), hidden)

        # 正则化层
        embed = F.dropout(output, p=0.2)

        # 输入维度: (5, 128)
        # 输出维度: (5, 5682)
        output = self.out(output.squeeze())

        return output, hidden

    def init_hidden(self):
        return torch.zeros(1, 1, 128)

4. 编写训练函数

def train():

    # 构建词典
    index_to_word, word_to_index, word_count, corpus_idx = build_vocab()
    # 数据集
    lyrics = LyricsDataset(corpus_idx, 32)
    # 初始化模型
    model = TextGenerator(word_count)
    # 损失函数
    criterion = nn.CrossEntropyLoss()
    # 优化方法
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    # 训练轮数
    epoch = 200
    # 迭代打印
    iter_num = 300
    # 训练日志
    train_log = 'lyrics_training.log'
    file = open(train_log, 'w')

    # 开始训练
    for epoch_idx in range(epoch):

        # 数据加载器
        lyrics_dataloader = DataLoader(lyrics, shuffle=True, batch_size=1)
        # 训练时间
        start = time.time()
        # 迭代次数
        iter_num = 0
        # 训练损失
        total_loss = 0.0

        for x, y in lyrics_dataloader:

            # 隐藏状态
            hidden = model.init_hidden()
            # 模型计算
            output, hidden = model(x, hidden)
            # 计算损失
            loss = criterion(output, y.squeeze())
            # 梯度清零
            optimizer.zero_grad()
            # 反向传播
            loss.backward()
            # 参数更新
            optimizer.step()

            iter_num += 1
            total_loss += loss.item()

        message = 'epoch %3s loss: %.5f time %.2f' % \
                  (epoch_idx + 1,
                   total_loss / iter_num,
                   time.time() - start)
        print(message)
        file.write(message + '\n')

    file.close()

    # 模型存储
    torch.save(model.state_dict(), 'model/lyrics_model_%d.bin' % epoch)

5. 编写预测函数

def predict(start_word, sentence_length):

    # 构建词典
    index_to_word, word_to_index, word_count, _ = build_vocab()
    # 构建模型
    model = TextGenerator(vocab_size=word_count)
    # 加载参数
    model.load_state_dict(torch.load('model/lyrics_model_200.bin'))

    # 隐藏状态
    hidden = model.init_hidden()
    # 词转换为索引
    word_idx = word_to_index[start_word]
    generate_sentence = [word_idx]
    for _ in range(sentence_length):
        output, hidden = model(torch.tensor([[word_idx]]), hidden)
        word_idx = torch.argmax(output)
        generate_sentence.append(word_idx)

    for idx in generate_sentence:
        print(index_to_word[idx], end='')
    print()
未经允许不得转载:一亩三分地 » 基于 RNN 循环网络实现文本生成