update termsim.

This commit is contained in:
shibing624 2022-03-05 03:10:45 +08:00
parent 413796bdfc
commit be6af69acc
12 changed files with 1543 additions and 68 deletions

View File

@ -3,6 +3,8 @@ message: "If you use this software, please cite it as below."
authors:
- family-names: "Xu"
given-names: "Ming"
title: "Similarities: Compute Similarity Score for humans"
orcid: "https://orcid.org/0000-0003-3402-7159"
title: "Similarities: Compute similarity score for humans"
url: "https://github.com/shibing624/similarities"
data-released: 2022-02-28
data-released: 2022-02-28
version: 0.0.3

View File

@ -49,45 +49,6 @@ Similarities is a toolkit for Compute Similarity Score between texts.
| GloVe | glove | Avg_word_embeddings_glove_6B_300d | 61.77 |
| BERT | bert-base-uncased | BERT-base-cls | 20.29 |
| BERT | bert-base-uncased | BERT-base-first_last_avg | 59.04 |
| BERT | bert-base-uncased | BERT-base-first_last_avg-whiten(NLI) | 63.65 |
| SBERT | sentence-transformers/bert-base-nli-mean-tokens | SBERT-base-nli-cls | 73.65 |
| SBERT | sentence-transformers/bert-base-nli-mean-tokens | SBERT-base-nli-first_last_avg | 77.96 |
| SBERT | xlm-roberta-base | paraphrase-multilingual-MiniLM-L12-v2 | 84.42 |
| CoSENT | bert-base-uncased | CoSENT-base-first_last_avg | 69.93 |
| CoSENT | sentence-transformers/bert-base-nli-mean-tokens | CoSENT-base-nli-first_last_avg | 79.68 |
- 中文匹配数据集的评测结果:
| Arch | Backbone | Model Name | ATEC | BQ | LCQMC | PAWSX | STS-B | Avg | QPS |
| :-- | :--- | :--- | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| CoSENT | hfl/chinese-macbert-base | CoSENT-macbert-base | 50.39 | **72.93** | **79.17** | **60.86** | **80.51** | **68.77** | 2572 |
| CoSENT | Langboat/mengzi-bert-base | CoSENT-mengzi-base | **50.52** | 72.27 | 78.69 | 12.89 | 80.15 | 58.90 | 2502 |
| CoSENT | bert-base-chinese | CoSENT-bert-base | 49.74 | 72.38 | 78.69 | 60.00 | 80.14 | 68.19 | 2653 |
| SBERT | bert-base-chinese | SBERT-bert-base | 46.36 | 70.36 | 78.72 | 46.86 | 66.41 | 61.74 | 1365 |
| SBERT | hfl/chinese-macbert-base | SBERT-macbert-base | 47.28 | 68.63 | **79.42** | 55.59 | 64.82 | 63.15 | 1948 |
| CoSENT | hfl/chinese-roberta-wwm-ext | CoSENT-roberta-ext | **50.81** | **71.45** | **79.31** | **61.56** | **81.13** | **68.85** | - |
| SBERT | hfl/chinese-roberta-wwm-ext | SBERT-roberta-ext | 48.29 | 69.99 | 79.22 | 44.10 | 72.42 | 62.80 | - |
- 本项目release模型的中文匹配评测结果
| Arch | Backbone | Model Name | ATEC | BQ | LCQMC | PAWSX | STS-B | Avg | QPS |
| :-- | :--- | :---- | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| Word2Vec | word2vec | w2v-light-tencent-chinese | 20.00 | 31.49 | 59.46 | 2.57 | 55.78 | 33.86 | 10283 |
| SBERT | xlm-roberta-base | paraphrase-multilingual-MiniLM-L12-v2 | 18.42 | 38.52 | 63.96 | 10.14 | 78.90 | 41.99 | 2371 |
| CoSENT | hfl/chinese-macbert-base | similarities-base-chinese | 31.93 | 42.67 | 70.16 | 17.21 | 79.30 | **48.25** | 2572 |
说明:
- 结果值均使用spearman系数
- 结果均只用该数据集的train训练在test上评估得到的表现没用外部数据
- `paraphrase-multilingual-MiniLM-L12-v2`模型名称是`sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2`,是`paraphrase-MiniLM-L12-v2`模型的多语言版本,速度快,效果好,支持中文
- `CoSENT-macbert-base`模型达到同级别参数量SOTA效果是用CoSENT方法训练运行[similarities/cosent](similarities/cosent)文件夹下代码可以复现结果
- `SBERT-macbert-base`模型是用SBERT方法训练运行[similarities/sentence_bert](similarities/sentence_bert)文件夹下代码可以复现结果
- `similarities-base-chinese`模型是用CoSENT方法训练基于MacBERT在中文STS-B数据训练得到模型文件已经上传到huggingface的模型库[shibing624/similarities-base-chinese](https://huggingface.co/shibing624/similarities-base-chinese)
- `w2v-light-tencent-chinese`是腾讯词向量的Word2Vec模型CPU加载使用
- 各预训练模型均可以通过transformers调用如MacBERT模型`--pretrained_model_path hfl/chinese-macbert-base`
- 中文匹配数据集下载[链接见下方](#数据集)
- 中文匹配任务实验表明pooling最优是`first_last_avg`预测可以调用SBert的`mean pooling`方法,效果损失很小
- QPS的GPU测试环境是Tesla V100显存32GB
# Demo
@ -111,11 +72,6 @@ cd similarities
python3 setup.py install
```
### 数据集
常见中文语义匹配数据集,包含[ATEC](https://github.com/IceFlameWorm/NLP_Datasets/tree/master/ATEC)、[BQ](http://icrc.hitsz.edu.cn/info/1037/1162.htm)、[LCQMC](http://icrc.hitsz.edu.cn/Article/show/171.html)、[PAWSX](https://arxiv.org/abs/1908.11828)、[STS-B](https://github.com/pluto-junzeng/CNSD)共5个任务。
可以从数据集对应的链接自行下载,也可以从[百度网盘(提取码:qkt6)](https://pan.baidu.com/s/1d6jSiU1wHQAEMWJi7JJWCQ)下载。
其中senteval_cn目录是评测数据集汇总senteval_cn.zip是senteval目录的打包两者下其一就好。
# Usage
@ -124,7 +80,7 @@ python3 setup.py install
### 2. 计算句子之间的相似度值
示例[semantic_text_similarity.py](./examples/semantic_text_similarity.py)
示例[examples/semantic_text_similarity.py](./examples/semantic_text_similarity.py)
> 句子余弦相似度值`score`范围是[-1, 1],值越大越相似。
@ -144,7 +100,7 @@ python3 setup.py install
- Issue(建议)[![GitHub issues](https://img.shields.io/github/issues/shibing624/similarities.svg)](https://github.com/shibing624/similarities/issues)
- 邮件我xuming: xuming624@qq.com
- 微信我:
加我*微信号xuming624, 备注:个人名称-公司-NLP* 进NLP交流群。
加我*微信号xuming624, 备注:姓名-公司-NLP* 进NLP交流群。
<img src="docs/wechat.jpeg" width="200" />
@ -154,10 +110,10 @@ python3 setup.py install
如果你在研究中使用了similarities请按如下格式引用
```latex
@misc{similarities,
@software{similarities,
title={similarities: A Tool for Compute Similarity Score},
author={Ming Xu},
howpublished={https://github.com/shibing624/similarities},
url={https://github.com/shibing624/similarities},
year={2022}
}
```
@ -178,5 +134,4 @@ python3 setup.py install
# Reference
- [A Simple but Tough-to-Beat Baseline for Sentence Embeddings[Sanjeev Arora and Yingyu Liang and Tengyu Ma, 2017]](https://openreview.net/forum?id=SyK00v5xx)
- [四种计算文本相似度的方法对比[Yves Peirsman]](https://zhuanlan.zhihu.com/p/37104535)
- [谈谈文本匹配和多轮检索](https://zhuanlan.zhihu.com/p/111769969)
- [liuhuanyong/SentenceSimilarity](https://github.com/liuhuanyong/SentenceSimilarity)

View File

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
@description:
This package contains implementations of pairwise similarity queries.
"""
from .similarity import BertSimilarity
# bring classes directly into package namespace, to save some typing

View File

@ -6,29 +6,246 @@
from typing import List, Union, Optional
import numpy as np
from numpy import ndarray
from torch import Tensor
import scipy
from loguru import logger
import torch
import logging
import scipy.sparse
from gensim import utils, matutils
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from enum import Enum, unique
class BertSimilarity:
def __init__(self, model_name_or_path=''):
def cos_sim(v1: Union[torch.Tensor, np.ndarray], v2: Union[torch.Tensor, np.ndarray]):
"""
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j.
:return: Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
if not isinstance(v1, torch.Tensor):
v1 = torch.tensor(v1)
if not isinstance(v2, torch.Tensor):
v2 = torch.tensor(v2)
if len(v1.shape) == 1:
v1 = v1.unsqueeze(0)
if len(v2.shape) == 1:
v2 = v2.unsqueeze(0)
v1_norm = torch.nn.functional.normalize(v1, p=2, dim=1)
v2_norm = torch.nn.functional.normalize(v2, p=2, dim=1)
return torch.mm(v1_norm, v2_norm.transpose(0, 1))
class EncoderType(Enum):
FIRST_LAST_AVG = 0
LAST_AVG = 1
CLS = 2
POOLER = 3
MEAN = 4
def __str__(self):
return self.name
@staticmethod
def from_string(s):
try:
return EncoderType[s]
except KeyError:
raise ValueError()
class Similarity:
"""
Compute cosine similarity of a dynamic query against a corpus of documents ('the index').
The index supports adding new documents dynamically.
"""
def __init__(self, model_name_or_path=None, docs=None):
"""
Cal text similarity
:param similarity_type:
:param embedding_type:
Parameters
----------
output_prefix : str
Prefix for shard filename. If None, a random filename in temp will be used.
docs : iterable of list of (int, number)
Corpus in streamed Gensim bag-of-words format.
"""
self.model_name_or_path = model_name_or_path
self.model = None
logger.debug(f'Loading model {model_name_or_path}')
logger.debug(f"Device: {device}")
def encode(self, sentences: Union[List[str], str]) -> ndarray:
return np.array([])
self.normalize = True
self.keyedvectors = None
self.docs = docs
self.norm = False
if docs is not None:
self.add_documents(docs)
def similarity_score(self, sentences1: Union[List[str], str], entences2: Union[List[str], str]):
def __len__(self):
"""Get length of index."""
return self.docs.shape[0]
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, corpus):
"""Extend the index with new documents.
Parameters
----------
corpus : iterable of list of (int, number)
Corpus in BoW format.
"""
Get similarity scores between sentences1 and sentences2
:param sentences1: list, sentence1 list
:param sentences2: list, sentence2 list
for doc in corpus:
self.docs.append(doc)
if len(self.docs) % 10000 == 0:
logger.info("PROGRESS: fresh_shard size=%i", len(self.docs))
def get_vector(self, text, norm=False):
"""Get the key's vector, as a 1D numpy array.
Parameters
----------
text : str
Key for vector to return.
norm : bool, optional
If True, the resulting vector will be L2-normalized (unit Euclidean length).
Returns
-------
numpy.ndarray
Vector for the specified key.
Raises
------
KeyError
If the given key doesn't exist.
"""
pass
def similarity(
self, text1: Union[List[str], str], text2: Union[List[str], str]
) -> Union[np.ndarray, torch.Tensor]:
"""
Compute similarity between two list of texts.
:param text1: list, sentence1 list
:param text2: list, sentence2 list
:return: return: Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
return 0.0
if not text1 or not text2:
return np.array([])
if isinstance(text1, str):
text1 = [text1] # type: ignore
if isinstance(text2, str):
text2 = [text2] # type: ignore
pass
def distance(self, text1: Union[List[str], str], text2: Union[List[str], str]):
"""Compute cosine distance between two keys.
Calculate 1 - :meth:`~gensim.models.keyedvectors.KeyedVectors.similarity`.
Parameters
----------
w1 : str
Input key.
w2 : str
Input key.
Returns
-------
float
Distance between `w1` and `w2`.
"""
return 1 - self.similarity(text1, text2)
def most_similar(self, query: Union[List[str], str], topn=10, threshold=0, exponent=2.0):
"""
Get topn similar text
:param query: str, query text
:param top_k: int, top_k
:return: list, top_k similar text
"""
if query not in self.keyedvectors:
logger.debug('an out-of-dictionary term "%s"', query)
else:
most_similar = self.keyedvectors.most_similar(query, topn=topn)
for t2, similarity in most_similar:
if similarity > threshold:
yield (t2, similarity ** exponent)
def semantic_search(
self,
query_embeddings: Union[torch.Tensor, np.ndarray],
corpus_embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 100,
corpus_chunk_size: int = 500000,
top_k: int = 10,
score_function=cos_sim
):
"""
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings.
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries.
:param query_embeddings: A 2 dimensional tensor with the query embeddings.
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed, but requires more memory.
:param top_k: Retrieve top k matching entries.
:param score_function: Funtion for computing scores. By default, cosine similarity.
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the keys 'corpus_id' and 'score'
"""
if isinstance(query_embeddings, (np.ndarray, np.generic)):
query_embeddings = torch.from_numpy(query_embeddings)
elif isinstance(query_embeddings, list):
query_embeddings = torch.stack(query_embeddings)
if len(query_embeddings.shape) == 1:
query_embeddings = query_embeddings.unsqueeze(0)
if isinstance(corpus_embeddings, (np.ndarray, np.generic)):
corpus_embeddings = torch.from_numpy(corpus_embeddings)
elif isinstance(corpus_embeddings, list):
corpus_embeddings = torch.stack(corpus_embeddings)
# Check that corpus and queries are on the same device
query_embeddings = query_embeddings.to(device)
corpus_embeddings = corpus_embeddings.to(device)
queries_result_list = [[] for _ in range(len(query_embeddings))]
for query_start_idx in range(0, len(query_embeddings), query_chunk_size):
# Iterate over chunks of the corpus
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size):
# Compute cosine similarity
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx + query_chunk_size],
corpus_embeddings[corpus_start_idx:corpus_start_idx + corpus_chunk_size])
# Get top-k scores
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])),
dim=1, largest=True, sorted=False)
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist()
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist()
for query_itr in range(len(cos_scores)):
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr],
cos_scores_top_k_values[query_itr]):
corpus_id = corpus_start_idx + sub_corpus_id
query_id = query_start_idx + query_itr
queries_result_list[query_id].append({'corpus_id': corpus_id, 'score': score})
# Sort and strip to top_k results
for idx in range(len(queries_result_list)):
queries_result_list[idx] = sorted(queries_result_list[idx], key=lambda x: x['score'], reverse=True)
queries_result_list[idx] = queries_result_list[idx][0:top_k]
return queries_result_list

591
similarities/termsim.py Normal file
View File

@ -0,0 +1,591 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 Vit Novotny <witiko@mail.muni.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
"""
This module provides classes that deal with term similarities.
Adjust the Index to compute term similarities.
"""
import math
from loguru import logger
from typing import Dict, List, Tuple, Set, Optional, Union
import numpy as np
import torch
import jieba
import jieba.posseg
from text2vec import Word2Vec
from similarities.similarity import cos_sim, Similarity
import os
from similarities.utils.distance import cosine_distance
from simhash import Simhash
from similarities.utils.tfidf import TFIDF
pwd_path = os.path.dirname(os.path.abspath(__file__))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class WordEmbeddingSimilarity(object):
"""
Computes cosine similarities between word embeddings and retrieves most
similar terms for a given term.
Notes
-----
By fitting the word embeddings to a vocabulary that you will be using, you
can eliminate all out-of-vocabulary (OOV) words that you would otherwise
receive from the `most_similar` method. In subword models such as fastText,
this procedure will also infer word-vectors for words from your vocabulary
that previously had no word-vector.
Parameters
----------
keyedvectors : :class:`~text2vec.Word2Vec`
The word embeddings.
docs: list of str
"""
def __init__(self, keyedvectors: Word2Vec, docs: List[str] = None):
# super().__init__()
self.keyedvectors = keyedvectors
self.docs = []
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return self.docs_embeddings.shape[0]
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
docs_embeddings = self.get_vector(docs)
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def get_vector(self, text):
return self.keyedvectors.encode(text)
def similarity(self, text1, text2, score_function=cos_sim):
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
return score_function(text_emb1, text_emb2)
def distance(self, text1, text2):
"""Compute cosine distance between two keys.
Calculate 1 - :meth:`~gensim.models.keyedvectors.KeyedVectors.similarity`.
Parameters
----------
w1 : str
Input key.
w2 : str
Input key.
Returns
-------
float
Distance between `w1` and `w2`.
"""
return 1 - self.similarity(text1, text2)
def semantic_search(
self,
query_embeddings: Union[torch.Tensor, np.ndarray],
corpus_embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 100,
corpus_chunk_size: int = 500000,
top_k: int = 10,
score_function=cos_sim
):
"""
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings.
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries.
:param query_embeddings: A 2 dimensional tensor with the query embeddings.
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed, but requires more memory.
:param top_k: Retrieve top k matching entries.
:param score_function: Funtion for computing scores. By default, cosine similarity.
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the keys 'corpus_id' and 'score'
"""
if isinstance(query_embeddings, (np.ndarray, np.generic)):
query_embeddings = torch.from_numpy(query_embeddings)
elif isinstance(query_embeddings, list):
query_embeddings = torch.stack(query_embeddings)
if len(query_embeddings.shape) == 1:
query_embeddings = query_embeddings.unsqueeze(0)
if isinstance(corpus_embeddings, (np.ndarray, np.generic)):
corpus_embeddings = torch.from_numpy(corpus_embeddings)
elif isinstance(corpus_embeddings, list):
corpus_embeddings = torch.stack(corpus_embeddings)
# Check that corpus and queries are on the same device
query_embeddings = query_embeddings.to(device)
corpus_embeddings = corpus_embeddings.to(device)
queries_result_list = [[] for _ in range(len(query_embeddings))]
for query_start_idx in range(0, len(query_embeddings), query_chunk_size):
# Iterate over chunks of the corpus
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size):
# Compute cosine similarity
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx + query_chunk_size],
corpus_embeddings[corpus_start_idx:corpus_start_idx + corpus_chunk_size])
# Get top-k scores
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])),
dim=1, largest=True, sorted=False)
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist()
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist()
for query_itr in range(len(cos_scores)):
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr],
cos_scores_top_k_values[query_itr]):
corpus_id = corpus_start_idx + sub_corpus_id
query_id = query_start_idx + query_itr
queries_result_list[query_id].append({'corpus_id': corpus_id, 'score': score})
# Sort and strip to top_k results
for idx in range(len(queries_result_list)):
queries_result_list[idx] = sorted(queries_result_list[idx], key=lambda x: x['score'], reverse=True)
queries_result_list[idx] = queries_result_list[idx][0:top_k]
return queries_result_list
def most_similar(self, query, topn=10):
result = []
query_embeddings = self.get_vector(query)
hits = self.semantic_search(query_embeddings, self.docs_embeddings, top_k=topn)
hits = hits[0] # Get the hits for the first query
print("Input question:", query)
for hit in hits[0:topn]:
result.append((self.docs[hit['corpus_id']], round(hit['score'], 4)))
print("\t{:.3f}\t{}".format(hit['score'], self.docs[hit['corpus_id']]))
print("\n\n========\n")
return result
class CilinSimilarity(object):
"""
Computes cilin similarities between word embeddings and retrieves most
similar terms for a given term.
"""
default_cilin_path = os.path.join(pwd_path, 'data', 'cilin.txt')
def __init__(self, cilin_path: str = default_cilin_path, docs: List[str] = None):
super().__init__()
self.cilin_dict = self.load_cilin_dict(cilin_path) # Cilin(词林) semantic dictionary
self.docs = []
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
@staticmethod
def load_cilin_dict(path):
"""加载词林语义词典"""
sem_dict = {}
for line in open(path, 'r', encoding='utf-8'):
line = line.strip()
terms = line.split(' ')
sem_type = terms[0]
words = terms[1:]
for word in words:
if word not in sem_dict:
sem_dict[word] = sem_type
else:
sem_dict[word] += ';' + sem_type
for word, sem_type in sem_dict.items():
sem_dict[word] = sem_type.split(';')
return sem_dict
def _compute_word_sim(self, word1, word2):
"""
比较计算词语之间的相似度取max最大值
:param word1:
:param word2:
:return:
"""
sems_word1 = self.cilin_dict.get(word1, [])
sems_word2 = self.cilin_dict.get(word2, [])
score_list = [self._compute_sem(sem_word1, sem_word2) for sem_word1 in sems_word1 for sem_word2 in sems_word2]
if score_list:
return max(score_list)
else:
return 0
def _compute_sem(self, sem1, sem2):
"""
基于语义计算词语相似度
:param sem1:
:param sem2:
:return:
"""
sem1 = [sem1[0], sem1[1], sem1[2:4], sem1[4], sem1[5:7], sem1[-1]]
sem2 = [sem2[0], sem2[1], sem2[2:4], sem2[4], sem2[5:7], sem2[-1]]
score = 0
for index in range(len(sem1)):
if sem1[index] == sem2[index]:
if index in [0, 1]:
score += 3
elif index == 2:
score += 2
elif index in [3, 4]:
score += 1
return score / 10
def similarity(self, text1, text2):
"""
基于词相似度计算句子相似度
:param text1:
:param text2:
:return:
"""
words1 = [word.word for word in jieba.posseg.cut(text1) if word.flag[0] not in ['u', 'x', 'w']]
words2 = [word.word for word in jieba.posseg.cut(text2) if word.flag[0] not in ['u', 'x', 'w']]
score_words1 = []
score_words2 = []
for word1 in words1:
score = max(self._compute_word_sim(word1, word2) for word2 in words2)
score_words1.append(score)
for word2 in words2:
score = max(self._compute_word_sim(word2, word1) for word1 in words1)
score_words2.append(score)
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
for doc in self.docs:
score = self.similarity(query, doc)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
class HownetSimilarity(object):
"""
Computes hownet similarities between word embeddings and retrieves most
similar terms for a given term.
"""
default_hownet_path = os.path.join(pwd_path, 'data', 'hownet.dat')
def __init__(self, cilin_path: str = default_hownet_path, docs: List[str] = None):
super().__init__()
self.hownet_dict = self.load_hownet_dict(cilin_path) # semantic dictionary
self.docs = []
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
@staticmethod
def load_hownet_dict(path):
"""加载Hownet语义词典"""
hownet_dict = {}
for line in open(path, 'r', encoding='utf-8'):
words = [word for word in line.strip().replace(' ', '>').replace('\t', '>').split('>') if word != '']
word = words[0]
word_def = words[2]
hownet_dict[word] = word_def.split(',')
return hownet_dict
def _compute_sem(self, sem1, sem2):
"""计算语义相似度"""
sem_inter = set(sem1).intersection(set(sem2))
sem_union = set(sem1).union(set(sem2))
return float(len(sem_inter)) / float(len(sem_union))
def _compute_word_sim(self, word1, word2):
"""比较两个词语之间的相似度"""
DEFS_word1 = self.hownet_dict.get(word1, [])
DEFS_word2 = self.hownet_dict.get(word2, [])
scores = [self._compute_sem(DEF_word1, DEF_word2) for DEF_word1 in DEFS_word1 for DEF_word2 in DEFS_word2]
if scores:
return max(scores)
else:
return 0
def similarity(self, text1, text2):
"""
基于词相似度计算句子相似度
:param text1:
:param text2:
:return:
"""
words1 = [word.word for word in jieba.posseg.cut(text1) if word.flag[0] not in ['u', 'x', 'w']]
words2 = [word.word for word in jieba.posseg.cut(text2) if word.flag[0] not in ['u', 'x', 'w']]
score_words1 = []
score_words2 = []
for word1 in words1:
score = max(self._compute_word_sim(word1, word2) for word2 in words2)
score_words1.append(score)
for word2 in words2:
score = max(self._compute_word_sim(word2, word1) for word1 in words1)
score_words2.append(score)
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
for doc in self.docs:
score = self.similarity(query, doc)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
class SimhashSimilarity(object):
"""
Computes Simhash similarities between word embeddings and retrieves most
similar terms for a given term.
"""
def __init__(self, docs: List[str] = None, hashbits=64):
super().__init__()
self.docs = []
self.hashbits = hashbits
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
docs_embeddings = []
for doc in docs:
doc_emb = self._get_code(doc)
docs_embeddings.append(doc_emb)
if len(docs_embeddings) % 10000 == 0:
logger.debug(f"Progress, add docs size: {len(docs_embeddings)}")
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def _hamming_distance(self, code_s1, code_s2):
"""利用64位数计算海明距离"""
x = (code_s1 ^ code_s2) & ((1 << self.hashbits) - 1)
ans = 0
while x:
ans += 1
x &= x - 1
return ans
def _get_features(self, string):
"""
对全文进行分词,提取全文特征,使用词性将虚词等无关字符去重
:param string:
:return:
"""
word_list = [word.word for word in jieba.posseg.cut(string) if
word.flag[0] not in ['u', 'x', 'w', 'o', 'p', 'c', 'm', 'q']]
return word_list
def _get_code(self, string):
"""对全文进行编码"""
return Simhash(self._get_features(string)).value
def similarity(self, text1, text2):
"""
计算句子间的海明距离
:param text1:
:param text2:
:return:
"""
code_s1 = self._get_code(text1)
code_s2 = self._get_code(text2)
similarity_score = (100 - self._hamming_distance(code_s1, code_s2) * 100 / self.hashbits) / 100
return similarity_score
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_emb = self._get_code(query)
for doc, doc_emb in zip(self.docs, self.docs_embeddings):
score = (100 - self._hamming_distance(query_emb, doc_emb) * 100 / self.hashbits) / 100
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
class TfidfSimilarity(object):
"""
Computes Tfidf similarities between word embeddings and retrieves most
similar texts for a given text.
"""
def __init__(self, docs: List[str] = None):
super().__init__()
self.docs = []
self.docs_embeddings = np.array([])
self.tfidf = TFIDF()
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
docs_embeddings = np.array(self.tfidf.get_tfidf(docs))
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def similarity(self, text1, text2):
"""
基于tfidf计算句子间的余弦相似度
:param text1:
:param text2:
:return:
"""
tfidf_features = self.tfidf.get_tfidf([text1, text2])
return cosine_distance(np.array(tfidf_features[0]), np.array(tfidf_features[1]))
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_emb = np.array(self.tfidf.get_tfidf([query]))
for doc, doc_emb in zip(self.docs, self.docs_embeddings):
score = cosine_distance(query_emb, doc_emb)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
if __name__ == '__main__':
wm = Word2Vec()
list_of_docs = ["This is a test1", "This is a test2", "This is a test3"]
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6"]
m = WordEmbeddingSimilarity(wm, list_of_docs)
m.add_documents(list_of_docs2)
v = m.get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test1"))
text1 = '周杰伦是一个歌手'
text2 = '刘若英是个演员'
m = CilinSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = HownetSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = SimhashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = TfidfSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))

View File

@ -3,3 +3,4 @@
@author:XuMing(xuming624@qq.com)
@description:
"""

View File

@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
from difflib import SequenceMatcher
import numpy as np
zero_bit = 0.000000001
def try_divide(x, y, val=0.0):
"""
try to divide two numbers
"""
if y != 0.0:
val = float(x) / y
return val
def cosine_distance(v1, v2):
"""
余弦距离
return cos score
"""
up = np.dot(v1, v2)
down = np.linalg.norm(v1) * np.linalg.norm(v2)
return try_divide(up, down)
def hamming_distance(v1, v2): # 海明距离
n = int(v1, 2) ^ int(v2, 2)
return bin(n & 0xffffffff).count('1')
def euclidean_distance(v1, v2): # 欧氏距离
return np.sqrt(np.sum(np.square(v1 - v2)))
def manhattan_distance(v1, v2): # 曼哈顿距离
return np.sum(np.abs(v1 - v2))
def chebyshev_distance(v1, v2): # 切比雪夫距离
return np.max(np.abs(v1 - v2))
def minkowski_distance(v1, v2): # 闵可夫斯基距离
return np.sqrt(np.sum(np.square(v1 - v2)))
def euclidean_distance_standardized(v1, v2): # 标准化欧氏距离
v1_v2 = np.vstack([v1, v2])
sk_v1_v2 = np.var(v1_v2, axis=0, ddof=1)
return np.sqrt(((v1 - v2) ** 2 / (sk_v1_v2 + zero_bit * np.ones_like(sk_v1_v2))).sum())
def edit_distance(str1, str2):
try:
# very fast
# http://stackoverflow.com/questions/14260126/how-python-levenshtein-ratio-is-computed
import Levenshtein
d = Levenshtein.distance(str1, str2) / float(max(len(str1), len(str2)))
except:
# https://docs.python.org/2/library/difflib.html
d = 1. - SequenceMatcher(lambda x: x == " ", str1, str2).ratio()
return d
def pearson_correlation_distance(v1, v2): # 皮尔逊相关系数Pearson correlation
v1_v2 = np.vstack([v1, v2])
return np.corrcoef(v1_v2)[0][1]
def jaccard_similarity_coefficient_distance(v1, v2): # 杰卡德相似系数(Jaccard similarity coefficient)
# 公式求解
v1 = np.asarray(v1)
v2 = np.asarray(v2)
up = np.double(np.bitwise_and((v1 != v2), np.bitwise_or(v1 != 0, v2 != 0)).sum())
down = np.double(np.bitwise_or(v1 != 0, v2 != 0).sum() + zero_bit)
return try_divide(up, down)
def wmd_distance(model, sent1_cut_list, sent2_cut_list): # WMD距离
"""
wmd 距离
:param model: gensim word2vec model
:param sent1_cut_list:
:param sent2_cut_list:
:return:
"""
distance = model.wmdistance(sent1_cut_list, sent2_cut_list)
return distance
def is_str_match(str1, str2, threshold=1.0):
assert 0.0 <= threshold <= 1.0, "Wrong threshold."
if float(threshold) == 1.0:
return str1 == str2
else:
return (1. - edit_distance(str1, str2)) >= threshold
def longest_match_size(str1, str2):
sq = SequenceMatcher(lambda x: x == " ", str1, str2)
match = sq.find_longest_match(0, len(str1), 0, len(str2))
return match.size
def longest_match_ratio(str1, str2):
sq = SequenceMatcher(lambda x: x == " ", str1, str2)
match = sq.find_longest_match(0, len(str1), 0, len(str2))
return try_divide(match.size, min(len(str1), len(str2)))
def jaccard_coef(A, B):
if not isinstance(A, set):
A = set(A)
if not isinstance(B, set):
B = set(B)
return try_divide(float(len(A.intersection(B))), len(A.union(B)))
def num_of_common_sub_str(str1, str2):
"""
求两个字符串的最长公共子串
思想建立一个二维数组保存连续位相同与否的状态
"""
lstr1 = len(str1)
lstr2 = len(str2)
record = [[0 for i in range(lstr2 + 1)] for j in range(lstr1 + 1)] # 多一位
max_num = 0 # 最长匹配长度
for i in range(lstr1):
for j in range(lstr2):
if str1[i] == str2[j]:
# 相同则累加
record[i + 1][j + 1] = record[i][j] + 1
if record[i + 1][j + 1] > max_num:
# 获取最大匹配长度
max_num = record[i + 1][j + 1]
return max_num
def string_hash(source):
if source == "":
return 0
else:
x = ord(source[0]) << 7
m = 1000003
mask = 2 ** 128 - 1
for c in source:
x = ((x * m) ^ ord(c)) & mask
x ^= len(source)
if x == -1:
x = -2
x = bin(x).replace('0b', '').zfill(64)[-64:]
return str(x)
def sim_hash(text):
import jieba.analyse
seg = jieba.cut(text)
key_word = jieba.analyse.extract_tags('|'.join(seg), topK=20, withWeight=True, allowPOS=())
# 先按照权重排序,再按照词排序
key_list = []
for feature, weight in key_word:
weight = int(weight * 20)
temp = []
for f in string_hash(feature):
if f == '1':
temp.append(weight)
else:
temp.append(-weight)
key_list.append(temp)
content_list = np.sum(np.array(key_list), axis=0)
# 编码读不出来
if len(key_list) == 0:
return '00'
simhash = ''
for c in content_list:
if c > 0:
simhash = simhash + '1'
else:
simhash = simhash + '0'
return simhash
def normalization(x):
"""
归一化最大最小值
:param x:
:return:
"""
return [(float(i) - min(x)) / float(max(x) - min(x) + zero_bit) for i in x]
def z_score(x, axis=0):
"""
标准化
:param x: arrary, numpy
:param axis: int, 0
:return: arrary, numpy
"""
x = np.array(x).astype(float)
xr = np.rollaxis(x, axis=axis)
xr -= np.mean(x, axis=axis)
xr /= np.std(x, axis=axis)
return x
if __name__ == '__main__':
vec1_test = np.array([1, 38, 17, 32])
vec2_test = np.array([5, 6, 8, 9])
str1_test = "你到底是谁?"
str2_test = "没想到我是谁,是真样子"
print(euclidean_distance(vec1_test, vec2_test))
print(cosine_distance(vec1_test, vec2_test))
print(manhattan_distance(vec1_test, vec2_test))
print(euclidean_distance(vec1_test, vec2_test))
print(cosine_distance(vec1_test, vec2_test))
print('hamming_distance:', str1_test, str2_test, hamming_distance(sim_hash(str1_test), sim_hash(str2_test)))
print(edit_distance(str1_test, str2_test))
print(num_of_common_sub_str(str1_test, str2_test))
print(normalization(vec1_test)) # 归一化0-1
print(z_score(vec1_test)) # 标准化0附近正负

View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description: Download file.
"""
import requests
import os
import sys
from tqdm.autonotebook import tqdm
def http_get(url, path):
"""
Downloads a URL to a given path on disc
"""
if os.path.dirname(path) != '':
os.makedirs(os.path.dirname(path), exist_ok=True)
req = requests.get(url, stream=True)
if req.status_code != 200:
print("Exception when trying to download {}. Response {}".format(url, req.status_code), file=sys.stderr)
req.raise_for_status()
return
download_filepath = path + "_part"
with open(download_filepath, "wb") as file_binary:
content_length = req.headers.get('Content-Length')
total = int(content_length) if content_length is not None else None
progress = tqdm(unit="B", total=total, unit_scale=True)
for chunk in req.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
progress.update(len(chunk))
file_binary.write(chunk)
os.rename(download_filepath, path)
progress.close()

View File

@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
class NgramUtil(object):
def __init__(self):
pass
@staticmethod
def unigrams(words):
"""
Input: a list of words, e.g., ["I", "am", "Denny"]
Output: a list of unigram
"""
assert type(words) == list
return words
@staticmethod
def bigrams(words, join_string, skip=0):
"""
Input: a list of words, e.g., ["I", "am", "Denny"]
Output: a list of bigram, e.g., ["I_am", "am_Denny"]
"""
assert type(words) == list
L = len(words)
if L > 1:
lst = []
for i in range(L - 1):
for k in range(1, skip + 2):
if i + k < L:
lst.append(join_string.join([words[i], words[i + k]]))
else:
# set it as unigram
lst = NgramUtil.unigrams(words)
return lst
@staticmethod
def trigrams(words, join_string, skip=0):
"""
Input: a list of words, e.g., ["I", "am", "Denny"]
Output: a list of trigram, e.g., ["I_am_Denny"]
"""
assert type(words) == list
L = len(words)
if L > 2:
lst = []
for i in range(L - 2):
for k1 in range(1, skip + 2):
for k2 in range(1, skip + 2):
if i + k1 < L and i + k1 + k2 < L:
lst.append(join_string.join([words[i], words[i + k1], words[i + k1 + k2]]))
else:
# set it as bigram
lst = NgramUtil.bigrams(words, join_string, skip)
return lst
@staticmethod
def fourgrams(words, join_string):
"""
Input: a list of words, e.g., ["I", "am", "Denny", "boy"]
Output: a list of trigram, e.g., ["I_am_Denny_boy"]
"""
assert type(words) == list
L = len(words)
if L > 3:
lst = []
for i in range(L - 3):
lst.append(join_string.join([words[i], words[i + 1], words[i + 2], words[i + 3]]))
else:
# set it as trigram
lst = NgramUtil.trigrams(words, join_string)
return lst
@staticmethod
def uniterms(words):
return NgramUtil.unigrams(words)
@staticmethod
def biterms(words, join_string):
"""
Input: a list of words, e.g., ["I", "am", "Denny", "boy"]
Output: a list of biterm, e.g., ["I_am", "I_Denny", "I_boy", "am_Denny", "am_boy", "Denny_boy"]
"""
assert type(words) == list
L = len(words)
if L > 1:
lst = []
for i in range(L - 1):
for j in range(i + 1, L):
lst.append(join_string.join([words[i], words[j]]))
else:
# set it as uniterm
lst = NgramUtil.uniterms(words)
return lst
@staticmethod
def triterms(words, join_string):
"""
Input: a list of words, e.g., ["I", "am", "Denny", "boy"]
Output: a list of triterm, e.g., ["I_am_Denny", "I_am_boy", "I_Denny_boy", "am_Denny_boy"]
"""
assert type(words) == list
L = len(words)
if L > 2:
lst = []
for i in range(L - 2):
for j in range(i + 1, L - 1):
for k in range(j + 1, L):
lst.append(join_string.join([words[i], words[j], words[k]]))
else:
# set it as biterm
lst = NgramUtil.biterms(words, join_string)
return lst
@staticmethod
def fourterms(words, join_string):
"""
Input: a list of words, e.g., ["I", "am", "Denny", "boy", "ha"]
Output: a list of fourterm, e.g., ["I_am_Denny_boy", "I_am_Denny_ha", "I_am_boy_ha", "I_Denny_boy_ha", "am_Denny_boy_ha"]
"""
assert type(words) == list
L = len(words)
if L > 3:
lst = []
for i in range(L - 3):
for j in range(i + 1, L - 2):
for k in range(j + 1, L - 1):
for l in range(k + 1, L):
lst.append(join_string.join([words[i], words[j], words[k], words[l]]))
else:
# set it as triterm
lst = NgramUtil.triterms(words, join_string)
return lst
@staticmethod
def ngrams(words, ngram, join_string=" "):
"""
wrapper for ngram
"""
if ngram == 1:
return NgramUtil.unigrams(words)
elif ngram == 2:
return NgramUtil.bigrams(words, join_string)
elif ngram == 3:
return NgramUtil.trigrams(words, join_string)
elif ngram == 4:
return NgramUtil.fourgrams(words, join_string)
elif ngram == 12:
unigram = NgramUtil.unigrams(words)
bigram = [x for x in NgramUtil.bigrams(words, join_string) if len(x.split(join_string)) == 2]
return unigram + bigram
elif ngram == 123:
unigram = NgramUtil.unigrams(words)
bigram = [x for x in NgramUtil.bigrams(words, join_string) if len(x.split(join_string)) == 2]
trigram = [x for x in NgramUtil.trigrams(words, join_string) if len(x.split(join_string)) == 3]
return unigram + bigram + trigram
@staticmethod
def nterms(words, nterm, join_string=" "):
"""wrapper for nterm"""
if nterm == 1:
return NgramUtil.uniterms(words)
elif nterm == 2:
return NgramUtil.biterms(words, join_string)
elif nterm == 3:
return NgramUtil.triterms(words, join_string)
elif nterm == 4:
return NgramUtil.fourterms(words, join_string)

View File

@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
# Author: dorianbrown
# Brief: https://github.com/dorianbrown/rank_bm25
import math
from multiprocessing import Pool, cpu_count
import numpy as np
"""
All of these algorithms have been taken from the paper:
Trotmam et al, Improvements to BM25 and Language Models Examined
Here we implement all the BM25 variations mentioned.
"""
class BM25:
def __init__(self, corpus, tokenizer=None):
self.corpus_size = len(corpus)
self.avgdl = 0
self.doc_freqs = []
self.idf = {}
self.doc_len = []
self.tokenizer = tokenizer
if tokenizer:
corpus = self._tokenize_corpus(corpus)
nd = self._initialize(corpus)
self._calc_idf(nd)
def _initialize(self, corpus):
nd = {} # word -> number of documents with word
num_doc = 0
for document in corpus:
self.doc_len.append(len(document))
num_doc += len(document)
frequencies = {}
for word in document:
if word not in frequencies:
frequencies[word] = 0
frequencies[word] += 1
self.doc_freqs.append(frequencies)
for word, freq in frequencies.items():
if word not in nd:
nd[word] = 0
nd[word] += 1
self.avgdl = num_doc / self.corpus_size
return nd
def _tokenize_corpus(self, corpus):
pool = Pool(cpu_count())
tokenized_corpus = pool.map(self.tokenizer, corpus)
return tokenized_corpus
def _calc_idf(self, nd):
raise NotImplementedError()
def get_scores(self, query):
raise NotImplementedError()
def get_top_n(self, query, documents, n=5):
assert self.corpus_size == len(documents), "The documents given don't match the index corpus!"
scores = self.get_scores(query)
top_n = np.argsort(scores)[::-1][:n]
return [documents[i] for i in top_n]
class BM25Okapi(BM25):
def __init__(self, corpus, tokenizer=None, k1=1.5, b=0.75, epsilon=0.25):
self.k1 = k1
self.b = b
self.epsilon = epsilon
super().__init__(corpus, tokenizer)
def _calc_idf(self, nd):
"""
Calculates frequencies of terms in documents and in corpus.
This algorithm sets a floor on the idf values to eps * average_idf
"""
# collect idf sum to calculate an average idf for epsilon value
idf_sum = 0
# collect words with negative idf to set them a special epsilon value.
# idf can be negative if word is contained in more than half of documents
negative_idfs = []
for word, freq in nd.items():
idf = math.log(self.corpus_size - freq + 0.5) - math.log(freq + 0.5)
self.idf[word] = idf
idf_sum += idf
if idf < 0:
negative_idfs.append(word)
self.average_idf = idf_sum / len(self.idf)
eps = self.epsilon * self.average_idf
for word in negative_idfs:
self.idf[word] = eps
def get_scores(self, query):
"""
The ATIRE BM25 variant uses an idf function which uses a log(idf) score. To prevent negative idf scores,
this algorithm also adds a floor to the idf value of epsilon.
See [Trotman, A., X. Jia, M. Crane, Towards an Efficient and Effective Search Engine] for more info
:param query: str
:return: array
"""
scores = np.zeros(self.corpus_size)
doc_len = np.array(self.doc_len)
for q in query:
q_freq = np.array([(doc.get(q) or 0) for doc in self.doc_freqs])
scores += (self.idf.get(q) or 0) * (q_freq * (self.k1 + 1) /
(q_freq + self.k1 * (1 - self.b + self.b * doc_len / self.avgdl)))
return scores
class BM25L(BM25):
def __init__(self, corpus, tokenizer=None, k1=1.5, b=0.75, delta=0.5):
# Algorithm specific parameters
self.k1 = k1
self.b = b
self.delta = delta
super().__init__(corpus, tokenizer)
def _calc_idf(self, nd):
for word, freq in nd.items():
idf = math.log(self.corpus_size + 1) - math.log(freq + 0.5)
self.idf[word] = idf
def get_scores(self, query):
scores = np.zeros(self.corpus_size)
doc_len = np.array(self.doc_len)
for q in query:
q_freq = np.array([(doc.get(q) or 0) for doc in self.doc_freqs])
ctd = q_freq / (1 - self.b + self.b * doc_len / self.avgdl)
scores += (self.idf.get(q) or 0) * q_freq * (self.k1 + 1) * (ctd + self.delta) / \
(self.k1 + ctd + self.delta)
return scores
class BM25Plus(BM25):
def __init__(self, corpus, tokenizer=None, k1=1.5, b=0.75, delta=1):
# Algorithm specific parameters
self.k1 = k1
self.b = b
self.delta = delta
super().__init__(corpus, tokenizer)
def _calc_idf(self, nd):
for word, freq in nd.items():
idf = math.log((self.corpus_size + 1) / freq)
self.idf[word] = idf
def get_scores(self, query):
scores = np.zeros(self.corpus_size)
doc_len = np.array(self.doc_len)
for q in query:
q_freq = np.array([(doc.get(q) or 0) for doc in self.doc_freqs])
scores += (self.idf.get(q) or 0) * (self.delta + (q_freq * (self.k1 + 1)) /
(self.k1 * (1 - self.b + self.b * doc_len / self.avgdl) + q_freq))
return scores

View File

@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import jieba
import jieba.posseg
from jieba.analyse.tfidf import DEFAULT_IDF, _get_abs_path
pwd_path = os.path.abspath(os.path.dirname(__file__))
default_stopwords_file = os.path.join(pwd_path, '../data/stopwords.txt')
def load_stopwords(file_path):
stopwords = set()
if file_path and os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
stopwords.add(line)
return stopwords
class IDFLoader(object):
def __init__(self, idf_path=None):
self.path = ""
self.idf_freq = {}
self.median_idf = 0.0
if idf_path:
self.set_new_path(idf_path)
def set_new_path(self, new_idf_path):
if self.path != new_idf_path:
self.path = new_idf_path
content = open(new_idf_path, 'rb').read().decode('utf-8')
self.idf_freq = {}
for line in content.splitlines():
word, freq = line.strip().split(' ')
self.idf_freq[word] = float(freq)
self.median_idf = sorted(
self.idf_freq.values())[len(self.idf_freq) // 2]
def get_idf(self):
return self.idf_freq, self.median_idf
class TFIDF:
def __init__(self, idf_path=None, stopwords=None):
self.stopwords = stopwords if stopwords else load_stopwords(default_stopwords_file)
self.idf_loader = IDFLoader(idf_path or DEFAULT_IDF)
self.idf_freq, self.median_idf = self.idf_loader.get_idf()
def set_idf_path(self, idf_path):
new_abs_path = _get_abs_path(idf_path)
if not os.path.isfile(new_abs_path):
raise Exception("IDF file does not exist: " + new_abs_path)
self.idf_loader.set_new_path(new_abs_path)
self.idf_freq, self.median_idf = self.idf_loader.get_idf()
def get_tfidf(self, sentences):
"""
Extract keywords from sentence using TF-IDF algorithm.
"""
result = []
for sentence in sentences:
words = [word.word for word in jieba.posseg.cut(sentence) if word.flag[0] not in ['u', 'x', 'w']]
words = [word for word in words if word.lower() not in self.stopwords or len(word.strip()) < 2]
word_idf = {word: self.idf_freq.get(word, self.median_idf) for word in words}
freqs = []
for w in list(self.idf_freq.keys()):
freqs.append(word_idf.get(w, 0))
result.append(freqs)
return result

View File

@ -3,3 +3,29 @@
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import jieba
import logging
class JiebaTokenizer(object):
def __init__(self, dict_path='', custom_word_freq_dict=None):
self.model = jieba
self.model.default_logger.setLevel(logging.ERROR)
# 初始化大词典
if os.path.exists(dict_path):
self.model.set_dictionary(dict_path)
# 加载用户自定义词典
if custom_word_freq_dict:
for w, f in custom_word_freq_dict.items():
self.model.add_word(w, freq=f)
def tokenize(self, sentence, cut_all=False, HMM=True):
"""
切词并返回切词位置
:param sentence: 句子
:param cut_all: 全模式默认关闭
:param HMM: 是否打开NER识别默认打开
:return: A list of strings.
"""
return self.model.lcut(sentence, cut_all=cut_all, HMM=HMM)