基于 kmeans 抽取文本摘要

基于 word2vec + kmeans 实现文本摘要的思路如下:

  1. 文本处理:对文本内容进行一些必须的预处理;
  2. 分割句子:将文档分割成多个子句;
  3. 句子编码:使用 word2vec 或者 bert 对句子进行编码;
  4. 文本聚类:对所有的句子聚类为 N 个簇,我们可以简单理解为这篇文章有 N 个重要思想;
  5. 摘要抽取:根据一定的策略从每个簇中抽取句子组成摘要。
import glob
from string import punctuation
import codecs
import fasttext
import torch
from MyKmeans import KMeans
from sklearn.metrics import silhouette_score
from sklearn.metrics import calinski_harabasz_score
import numpy as np
import time
import jieba
jieba.setLogLevel(0)
from transformers import BertModel
from transformers import BertTokenizer

mapping.txt:

# .	。
,	,
!	!
?	?
:	:
;	;
'	‘
"	“
(	(
)	)
[	【
]	】
{	{
}	}
-	-
_	_
&	&
@	@
#	#
$	$
%	%
+	+
=	=
<	<
>	>
/	/
\	\
|	|

1. 句子分割

将文档中的英文标点符号转换为中文标点符号。然后以句号、感叹号、问号分割为多个子句。

# 加载标点符号
punctuation = {}
for line in codecs.open('others/mapping.txt', encoding='utf-8'):
    line = line.strip()
    if line[0] == '#':
        continue
    line = line.split()
    punctuation[line[0]] = line[1]


# 加载词向量模型
embed_model = fasttext.load_model('others/cc.zh.300.bin')


def convert(ch):
    """英文标点符号转换为中文"""
    if ch not in punctuation:
        return ch
    return punctuation[ch]


def text_normalize(text):
    """文本预处理"""

    words = []
    # 文本去除多余空格
    text = ''.join(text.split())
    for word in text:
        # 去除句子中的空格
        if word == '\n':
            continue
        # 转换标点符号
        words.append(convert(word))

    return ''.join(words)


def text_splitter(text):
    """拆分句子"""

    sentences = []
    seg_list = list(text)
    sentence = []
    for word in seg_list:
        word = word.strip()
        if word in ["。", "!", "?"]:
            # 排除只有一个标点符号的句子
            if len(sentence) > 1:
                sentences.append(''.join(sentence) + word)
            sentence = []
            continue
        sentence.append(word)
    return sentences

2. 句子编码

分别对每一个句子进行编码。例如:使用 tf-idf、word2vec、bert 都可以实现文本编码。build_embedding 用的是 word2vec,build_embedding2 中使用的 bert 的 [CLS] 表示句向量。

def build_embedding(sentences):

    embeddings = []
    for sentence in sentences:
        sentence = jieba.lcut(sentence)
        sentence = ' '.join(sentence)
        embedding = embed_model.get_sentence_vector(sentence)
        embeddings.append(embedding)

    # 句子向量标准化
    return embeddings

def build_embedding2(sentences):

    with torch.no_grad():
        estimator = BertModel.from_pretrained('others/bert-base-chinese')
        tokenizer = BertTokenizer.from_pretrained('others/bert-base-chinese')
        inputs = tokenizer(sentences, padding=True, return_tensors='pt')
        outputs = estimator(**inputs)
    # 获得每个句子的 [CLS] 句向量
    embeddings = outputs.last_hidden_state[:, 0, :].numpy()

    print('shape:', embeddings.shape)

    return embeddings

3. 文本聚类

这一步就是做文本聚类,根据聚类的结果抽取部分句子作为文本摘要。每一步都有一些可选操作,例如:

  1. 文本聚类:可以使用基于欧式距离的 KMeans,也可以使用基于余弦距离的 KMeans;聚类时簇的选择,可以根据指定的抽取句子多少来决定簇的多少,也可以多聚类几次经过评估找到表现最好的簇;每一个质心数量也会随机聚类多次,找出聚类效果较好的。
  2. 抽取摘要:可以从每一个簇中抽取距离质心最近或者随机抽取某个的句子组成摘要;也可以只从句子数量超过指定值的簇中抽取句子组成摘要。
def select_best_score(embeddings, n_clusters):                                        
                                                                                      
    distances = []                                                                    
    prescores = []                                                                    
    for _ in range(100):                                                              
        # 初始化聚类算法对象                                                                   
        estimator = KMeans(n_clusters=n_clusters, n_init='auto', random_state=None)   
        # 计算每个样本距离质心的距离                                                               
        distance = estimator.fit_transform(embeddings)                                
        labels = np.argmin(distance, axis=1)                                          
        prescore = silhouette_score(embeddings, labels)                               
        distances.append(distance)                                                    
        prescores.append(prescore)                                                    
                                                                                      
    bestindex = np.argmax(prescores)                                                  
    bestscore = prescores[bestindex]                                                  
    distance = distances[bestindex]                                                   
                                                                                      
    return bestscore, distance                                                        
                                                                                      
                                                                                      
def text_selector1(embeddings, number):                                               
    """直接根据句子数量来设置簇的数量"""                                                             
                                                                                      
    if number > len(embeddings):                                                      
        number = len(embeddings)                                                      
                                                                                      
    _, distance = select_best_score(embeddings, number)                               
    sids = []                                                                         
    for index in range(number):                                                       
        sid = np.argmin(distance[:, index])                                           
        sids.append(sid)                                                              
                                                                                      
    return sids                                                                       
                                                                                      
                                                                                      
def text_selector2(embeddings, number):                                               
    """尝试不同簇的数量"""                                                                    
                                                                                      
    n_clusters = range(2, len(embeddings))                                            
    bestscores, distances = [], []                                                    
    for n_cluster in n_clusters:                                                      
        bestscore, distance = select_best_score(embeddings, n_cluster)                
        bestscores.append(bestscore)                                                  
        distances.append(distance)                                                    
                                                                                      
    # SC越大,则说明聚类效果更好,获得SC最大的索引                                                        
    best_index = np.argmax(bestscores)                                                
    # 筛选出距离不同质心最近的句子                                                                  
    distance = distances[best_index]                                                  
                                                                                      
    sids = []                                                                         
    for index in range(distance.shape[1]):                                            
        sid = np.argmin(distance[:, index])                                           
        sids.append(sid)                                                              
                                                                                      
    # 对句子按照原文顺序重新排序                                                                   
    sids = np.sort(sids)                                                              
    if number == -1:                                                                  
        return sids                                                                   
    else:                                                                             
        return sids[:number]                                                          
                                                                                      
                                                                                      
def text_selector3(embeddings, number):                                               
                                                                                      
    from Bio.Cluster import kcluster                                                  
    result = kcluster(embeddings, nclusters=2, method='a', dist='u', npass=100)       
    print(result)                                                                     
    print(silhouette_score(embeddings, result[0]))                                    
                                                                                      
                                                                                      
    result = kcluster(embeddings, nclusters=3, method='a', dist='u', npass=100)       
    print(result)                                                                     
    print(silhouette_score(embeddings, result[0]))                                    
                                                                                      
                                                                                      
    result = kcluster(embeddings, nclusters=4, method='a', dist='u', npass=100)       
    print(result)                                                                     
    print(silhouette_score(embeddings, result[0]))                                    

4. 抽取摘要

if __name__ == '__main__':

    number = 2
    for filename in glob.glob('samples/*.txt'):
        text = codecs.open(filename, encoding='utf-8').read()
        text = text_normalize(text)
        text = text_splitter(text)
        if len(text) <= 5:
            print('摘要内容:', ''.join(text[:number]))
            break
        embd = build_embedding2(text)
        sids = text_selector2(embd, number)
        summ = build_summary(text, sids)
        print('句子数量:', len(sids), '摘要内容:', summ)
        print('-' * 100)

三篇文章分别如下:

近年来,全球范围内的气候变化已成为全球关注的焦点之一。科学家们警告我们,气候变化对地球环境、社会和经济造成了严重的威胁。在这种情况下,新能源革命的推动力变得尤为重要。
新能源,特别是可再生能源,如太阳能和风能,已成为降低碳排放和减缓气候变化的重要工具。各国政府、企业和研究机构都在积极寻求新的可再生能源解决方案,以满足能源需求,并减少对化石燃料的依赖。
虽然在过去的几十年里,可再生能源已经取得了显著的进展,但在能源领域仍然面临一些挑战。技术的进步和投资的增加对于扩大可再生能源的规模至关重要。此外,政策制定者需要采取措施,以鼓励可再生能源的采用,并减少对化石燃料的补贴。
新能源革命的成功将不仅有助于应对气候变化,还将创造就业机会、提高能源安全性,并为未来的世代提供可持续的能源来源。在全球范围内共同努力,我们可以实现能源转型,为地球的可持续未来奠定基础。
随着数字技术的不断发展,数字货币已经成为金融领域的一个热门话题。比特币等加密货币的崛起引发了对金融体系的根本性变革。
数字货币的出现改变了传统的货币观念,它们不依赖于中央银行发行,而是通过区块链技术进行管理和交换。这种去中心化的特点带来了许多潜在的好处,包括降低交易成本、提高金融包容性,以及促进跨境交易。
然而,数字货币也伴随着一些挑战和风险,包括监管问题、市场波动性以及安全性担忧。政府和国际组织正在积极研究数字货币的监管框架,以确保金融体系的稳定和安全。
尽管存在争议,数字货币已经在金融界引发了深刻的变革,并可能在未来几年内继续发展。它们可能成为未来金融体系的一部分,重新定义货币、支付和金融服务
太空探索一直是人类的梦想和追求。近年来,随着技术的进步和国际合作的增加,人类太空探索迈出了重要的一步,展望了未来的前景。
太空探索的一个重要里程碑是国际空间站(ISS)的建设和运营,这个多国合作的项目为科学研究、国际合作和长期太空任务提供了平台。此外,商业航天公司的兴起也为太空探索带来了新的动力,包括私人太空旅行、卫星部署和资源开发等领域的商业机会。
人类登陆月球和火星的计划也正在进行中,这些使命将挑战我们的科学和工程能力,并改变我们对太空的认知。太空探索还提出了一系列伦理和环境问题,例如太空垃圾和外层空间资源的管理。
尽管太空探索充满挑战,但它也激发了人们的好奇心和创新精神,为未来的太空探险打开了新的可能性。太空探索不仅是科学和技术的领域,还是人类的探索精神的体现,它将继续影响我们的未来。

对应的抽取摘要结果如下:

shape: (10, 768)
句子数量: 2 摘要内容: 技术的进步和投资的增加对于扩大可再生能源的规模至关重要。此外,政策制定者需要采取措施,以鼓励可再生能源的采用,并减少对化石燃料的补贴。
----------------------------------------------------------------------------------------------------
shape: (7, 768)
句子数量: 2 摘要内容: 数字货币的出现改变了传统的货币观念,它们不依赖于中央银行发行,而是通过区块链技术进行管理和交换。尽管存在争议,数字货币已经在金融界引发了深刻的变革,并可能在未来几年内继续发展。
----------------------------------------------------------------------------------------------------
shape: (8, 768)
句子数量: 2 摘要内容: 太空探索还提出了一系列伦理和环境问题,例如太空垃圾和外层空间资源的管理。尽管太空探索充满挑战,但它也激发了人们的好奇心和创新精神,为未来的太空探险打开了新的可能性。
----------------------------------------------------------------------------------------------------
未经允许不得转载:一亩三分地 » 基于 kmeans 抽取文本摘要
评论 (0)

5 + 2 =