update fast sim.

This commit is contained in:
shibing624 2022-03-07 01:14:37 +08:00
parent 16e1d283c1
commit 49462a1e36
18 changed files with 1173 additions and 671 deletions

138
README.md
View File

@ -17,7 +17,6 @@ similarities相似度计算、语义匹配搜索工具包。
**Guide**
- [Feature](#Feature)
- [Evaluate](#Evaluate)
- [Install](#install)
- [Usage](#usage)
- [Contact](#Contact)
@ -31,20 +30,9 @@ similarities相似度计算、语义匹配搜索工具包。
- 余弦相似Cosine Similarity两向量求余弦
- 点积Dot Product两向量归一化后求内积
- 词移距离Word Movers Distance词移距离使用两文本间的词向量测量其中一文本中的单词在语义空间中移动到另一文本单词所需要的最短距离
- [RankBM25](similarities/bm25.py)BM25的变种算法对query和文档之间的相似度打分得到docs的rank排序
- [SemanticSearch](https://github.com/shibing624/similarities/blob/master/similarities/sbert.py#L80)向量相似检索使用Cosine Similarty + topk高效计算比一对一暴力计算快一个数量级
- [RankBM25](similarities/literalsim.py)BM25的变种算法对query和文档之间的相似度打分得到docs的rank排序
- [SemanticSearch](https://github.com/shibing624/similarities/blob/main/similarities/similarity.py#L99)向量相似检索使用Cosine Similarty + topk高效计算比一对一暴力计算快一个数量级
# Evaluate
### 文本匹配
- 英文匹配数据集的评测结果:
| Arch | Backbone | Model Name | English-STS-B |
| :-- | :--- | :--- | :-: |
| GloVe | glove | Avg_word_embeddings_glove_6B_300d | 61.77 |
| BERT | bert-base-uncased | BERT-base-cls | 20.29 |
| BERT | bert-base-uncased | BERT-base-first_last_avg | 59.04 |
# Demo
@ -70,22 +58,134 @@ python3 setup.py install
# Usage
### 1. 计算句子之间的相似度值
### 1. 计算两个句子的相似度值
示例[examples/base_demo.py](./examples/base_demo.py)
```shell
from similarities import Similarity
m = Similarity("shibing624/text2vec-base-chinese")
r = m.similarity('如何更换花呗绑定银行卡', '花呗更改绑定银行卡')
print(f"{r:.4f}")
```
output:
```shell
0.8551
```
> 句子余弦相似度值`score`范围是[-1, 1],值越大越相似。
### 2. 计算句子与文档集之间的相似度值
### 2. 文档集中相似文本搜索
一般在文档候选集中找与query最相似的文本常用于QA场景的问句相似匹配、文本相似检索等任务。
中文示例[examples/base_demo.py](./examples/base_demo.py)
> `Score`的值范围[-1, 1]值越大表示该query与corpus相似度越近。
```python
from similarities import Similarity
if __name__ == '__main__':
model = Similarity("shibing624/text2vec-base-chinese")
# 1.Compute cosine similarity between two sentences.
sentences = ['如何更换花呗绑定银行卡',
'花呗更改绑定银行卡']
corpus = [
'花呗更改绑定银行卡',
'我什么时候开通了花呗',
'俄罗斯警告乌克兰反对欧盟协议',
'暴风雨掩埋了东北部新泽西16英寸的降雪',
'中央情报局局长访问以色列叙利亚会谈',
'人在巴基斯坦基地的炸弹袭击中丧生',
]
similarity_score = model.similarity(sentences[0], sentences[1])
print(f"{sentences[0]} vs {sentences[1]}, score: {float(similarity_score):.4f}")
# 2.Compute similarity between two list
similarity_scores = model.similarity(sentences, corpus)
print(similarity_scores.numpy())
for i in range(len(sentences)):
for j in range(len(corpus)):
print(f"{sentences[i]} vs {corpus[j]}, score: {similarity_scores.numpy()[i][j]:.4f}")
# 3.Semantic Search
m = Similarity("shibing624/text2vec-base-chinese", corpus=corpus)
q = '如何更换花呗绑定银行卡'
print(m.most_similar(q, topn=5))
print("query:", q)
for i in m.most_similar(q, topn=5):
print('\t', i)
```
output:
```shell
如何更换花呗绑定银行卡 vs 花呗更改绑定银行卡, score: 0.8551
...
如何更换花呗绑定银行卡 vs 花呗更改绑定银行卡, score: 0.8551
如何更换花呗绑定银行卡 vs 我什么时候开通了花呗, score: 0.7212
如何更换花呗绑定银行卡 vs 俄罗斯警告乌克兰反对欧盟协议, score: 0.1450
如何更换花呗绑定银行卡 vs 暴风雨掩埋了东北部新泽西16英寸的降雪, score: 0.2167
如何更换花呗绑定银行卡 vs 中央情报局局长访问以色列叙利亚会谈, score: 0.2517
如何更换花呗绑定银行卡 vs 人在巴基斯坦基地的炸弹袭击中丧生, score: 0.0809
花呗更改绑定银行卡 vs 花呗更改绑定银行卡, score: 1.0000
花呗更改绑定银行卡 vs 我什么时候开通了花呗, score: 0.6807
花呗更改绑定银行卡 vs 俄罗斯警告乌克兰反对欧盟协议, score: 0.1714
花呗更改绑定银行卡 vs 暴风雨掩埋了东北部新泽西16英寸的降雪, score: 0.2162
花呗更改绑定银行卡 vs 中央情报局局长访问以色列叙利亚会谈, score: 0.2728
花呗更改绑定银行卡 vs 人在巴基斯坦基地的炸弹袭击中丧生, score: 0.1279
query: 如何更换花呗绑定银行卡
(0, '花呗更改绑定银行卡', 0.8551459908485413)
(1, '我什么时候开通了花呗', 0.721195638179779)
(4, '中央情报局局长访问以色列叙利亚会谈', 0.2517135739326477)
(3, '暴风雨掩埋了东北部新泽西16英寸的降雪', 0.21666759252548218)
(2, '俄罗斯警告乌克兰反对欧盟协议', 0.1450251191854477)
```
> `Score`的值范围[-1, 1]值越大表示该query与corpus的文本越相似。
英文示例[examples/base_english_demo.py](./examples/base_english_demo.py)
### 3. 快速近似匹配搜索
支持Annoy、Hnswlib的近似匹配搜索常用于百万数据集的匹配搜索任务。
示例[examples/fast_sim_demo.py](./examples/fast_sim_demo.py)
### 4. 基于字面的文本相似度计算
支持同义词词林Cilin、知网Hownet、词向量WordEmbedding、Tfidf、Simhash、BM25等算法的相似度计算和匹配搜索常用于文本匹配冷启动。
SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
示例[examples/literal_sim_demo.py](./examples/literal_sim_demo.py)
```python
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
text1 = "如何更换花呗绑定银行卡"
text2 = "花呗更改绑定银行卡"
m = TfidfSimilarity()
print(text1, text2, ' sim score: ', m.similarity(text1, text2))
print('distance:', m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
```
output:
```shell
如何更换花呗绑定银行卡 花呗更改绑定银行卡 sim score: 0.8203384355246909
distance: 0.17966156447530912
[(0, '刘若英是个演员', 0.9847577834309504), (3, '我不是演员吗', 0.7056381915655814), (1, '他唱歌很好听', 0.5), (2, 'women喜欢这首歌', 0.5)]
```
# Contact
@ -133,3 +233,5 @@ version = {0.0.4}
# Reference
- [A Simple but Tough-to-Beat Baseline for Sentence Embeddings[Sanjeev Arora and Yingyu Liang and Tengyu Ma, 2017]](https://openreview.net/forum?id=SyK00v5xx)
- [liuhuanyong/SentenceSimilarity](https://github.com/liuhuanyong/SentenceSimilarity)
- [shibing624/text2vec](https://github.com/shibing624/text2vec)
-

View File

@ -3,30 +3,44 @@
@author:XuMing(xuming624@qq.com)
@description:
This basic example loads a pre-trained model from the web and uses it to
generate sentence embeddings for a given list of sentences.
compute cosine similarity for a given list of sentences.
"""
import sys
sys.path.append('..')
from similarities.literalsim import WordEmbeddingSimilarity
from text2vec import Word2Vec
from similarities import Similarity
from loguru import logger
logger.remove()
logger.add(sys.stderr, level="INFO")
if __name__ == '__main__':
wv_model = Word2Vec()
model = WordEmbeddingSimilarity(wv_model)
# Embed a list of sentences
model = Similarity("shibing624/text2vec-base-chinese")
# 1.Compute cosine similarity between two sentences.
sentences = ['如何更换花呗绑定银行卡',
'花呗更改绑定银行卡']
sentences2 = ['如何更换 银行卡',
'西方开花北方结果']
sentence_embeddings = model.get_vector(sentences)
print(type(sentence_embeddings), sentence_embeddings.shape)
corpus = [
'花呗更改绑定银行卡',
'我什么时候开通了花呗',
'俄罗斯警告乌克兰反对欧盟协议',
'暴风雨掩埋了东北部新泽西16英寸的降雪',
'中央情报局局长访问以色列叙利亚会谈',
'人在巴基斯坦基地的炸弹袭击中丧生',
]
similarity_score = model.similarity(sentences[0], sentences[1])
print(similarity_score.numpy())
print(f"{sentences[0]} vs {sentences[1]}, score: {float(similarity_score):.4f}")
similarity_score = model.similarity(sentences, sentences2)
print(similarity_score.numpy())
# 2.Compute similarity between two list
similarity_scores = model.similarity(sentences, corpus)
print(similarity_scores.numpy())
for i in range(len(sentences)):
for j in range(len(corpus)):
print(f"{sentences[i]} vs {corpus[j]}, score: {similarity_scores.numpy()[i][j]:.4f}")
# 3.Semantic Search
m = Similarity(sentence_model="shibing624/text2vec-base-chinese", corpus=corpus)
q = '如何更换花呗绑定银行卡'
print(m.most_similar(q, topn=5))
print("query:", q)
for i in m.most_similar(q, topn=5):
print('\t', i)

View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description: 文本语义相似度计算
"""
import sys
sys.path.append('..')
from similarities import Similarity
# Two lists of sentences
sentences1 = ['The cat sits outside',
'A man is playing guitar',
'The new movie is awesome']
sentences2 = ['The dog plays in the garden',
'A woman watches TV',
'The new movie is so great']
m = Similarity("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# 使用的是多语言文本匹配模型
scores = m.similarity(sentences1, sentences2)
print('1:use Similarity compute cos scores\n')
for i in range(len(sentences1)):
for j in range(len(sentences2)):
print("{} \t\t {} \t\t Score: {:.4f}".format(sentences1[i], sentences2[j], scores[i][j]))
print()
print('-' * 42)
print('2:search\n')
# 2.Semantic Search
corpus = [
'The cat sits outside',
'A man is playing guitar',
'I love pasta',
'The new movie is awesome',
'The cat plays in the garden',
'A woman watches TV',
'The new movie is so great',
'Do you like pizza?'
]
m.add_corpus(corpus=corpus)
q = 'The cat sits outside'
print(m.most_similar(q, topn=5))
print("query:", q)
for i in m.most_similar(q, topn=5):
print('\t', i)

67
examples/fast_sim_demo.py Normal file
View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import sys
sys.path.append('..')
from text2vec import SentenceModel
from similarities.fastsim import AnnoySimilarity
from similarities.fastsim import HnswlibSimilarity
sm = SentenceModel()
def hnswlib():
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = HnswlibSimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.add_corpus(list_of_docs2)
m.build_index()
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.save_index('test.model')
m.load_index('test.model')
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
os.remove('test.model')
def annoy():
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = AnnoySimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.add_corpus(list_of_docs2)
m.build_index()
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.save_index('test.model')
m.load_index('test.model')
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
os.remove('test.model')
if __name__ == '__main__':
hnswlib()
annoy()

View File

@ -5,7 +5,7 @@
"""
from text2vec import Word2Vec
import gradio as gr
from similarities.termsim import WordEmbeddingSimilarity
from similarities import WordEmbeddingSimilarity
wv_model = Word2Vec()
sim_model = WordEmbeddingSimilarity(wv_model)

View File

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import sys
from text2vec import Word2Vec
sys.path.append('..')
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
def main():
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = SimhashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
print(m.most_similar('刘若英是演员'))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
text1 = "如何更换花呗绑定银行卡"
text2 = "花呗更改绑定银行卡"
m = TfidfSimilarity()
print(text1, text2, ' sim score: ', m.similarity(text1, text2))
print('distance:', m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
m = BM25Similarity()
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
wm = Word2Vec()
list_of_corpus = ["This is a test1", "This is a test2", "This is a test3"]
list_of_corpus2 = ["that is test4", "that is a test5", "that is a test6"]
m = WordEmbeddingSimilarity(wm, list_of_corpus)
m.add_corpus(list_of_corpus2)
v = m.get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test1"))
text1 = '周杰伦是一个歌手'
text2 = '刘若英是个演员'
m = CilinSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
m = HownetSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
if __name__ == '__main__':
main()

View File

@ -1,7 +1,8 @@
jieba>=0.39
loguru
transformers>=4.6.0
tqdm
scikit-learn
gensim>=4.0.0
pandas
text2vec
hnswlib
#annoy

View File

@ -44,10 +44,10 @@ setup(
"jieba>=0.39",
"loguru",
"transformers>=4.6.0",
"tqdm",
"scikit-learn",
"gensim>=4.0.0",
"pandas",
"text2vec",
"hnswlib",
],
packages=find_packages(),
)

View File

@ -7,3 +7,23 @@ This package contains implementations of pairwise similarity queries.
"""
# bring classes directly into package namespace, to save some typing
from similarities.version import __version__
from similarities.similarity import Similarity
from similarities.similarity import (
cos_sim,
dot_score,
semantic_search,
community_detection,
pairwise_dot_score,
pairwise_cos_sim
)
from similarities.fastsim import AnnoySimilarity, HnswlibSimilarity
from similarities.literalsim import (
SimhashSimilarity,
TfidfSimilarity,
BM25Similarity,
WordEmbeddingSimilarity,
CilinSimilarity,
HownetSimilarity
)

135
similarities/fastsim.py Normal file
View File

@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
from typing import List
from loguru import logger
from similarities.similarity import Similarity
class AnnoySimilarity(Similarity):
"""
Computes cosine similarities between word embeddings and retrieves most
similar query for a given docs with Annoy.
"""
def __init__(self, sentence_model, corpus: List[str] = None,
embedding_size: int = 384, n_trees: int = 256):
super().__init__(sentence_model, corpus)
self.index = None
if corpus is not None and self.corpus_embeddings.size > 0:
self.build_index(embedding_size, n_trees)
def build_index(self, embedding_size: int = 384, n_trees: int = 256):
"""Build Annoy index after add new documents."""
# Create Annoy Index
try:
from annoy import AnnoyIndex
except ImportError:
raise ImportError("Annoy is not installed. Please install it first, e.g. with `pip install annoy`.")
self.index = AnnoyIndex(embedding_size, 'angular')
# Creating the annoy index
logger.info(f"Init annoy index, embedding_size: {embedding_size}")
for i in range(len(self.corpus_embeddings)):
self.index.add_item(i, self.corpus_embeddings[i])
logger.info(f"Create Annoy index with {n_trees} trees. This can take some time.")
self.index.build(n_trees)
def save_index(self, index_path: str):
"""Save the annoy index to disk."""
if self.index and index_path:
logger.info(f"Saving index to: {index_path}")
self.index.save(index_path)
else:
logger.warning("No index path given. Index not saved.")
def load_index(self, index_path: str):
"""Load Annoy Index from disc."""
if index_path and os.path.exists(index_path):
logger.info(f"Loading index from: {index_path}")
self.index.load(index_path)
else:
logger.warning("No index path given. Index not loaded.")
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
query_embeddings = self.get_vector(query)
if not self.index:
logger.warning(f"No index found. Please add corpus and build index first, e.g. with `build_index()`."
f"Now returning slow search result.")
return super().most_similar(query, topn)
corpus_ids, scores = self.index.get_nns_by_vector(query_embeddings, topn, include_distances=True)
for id, score in zip(corpus_ids, scores):
score = 1 - ((score ** 2) / 2)
result.append((id, self.corpus[id], score))
return result
class HnswlibSimilarity(Similarity):
"""
Computes cosine similarities between word embeddings and retrieves most
similar query for a given docs with Hnswlib.
"""
def __init__(self, sentence_model, corpus: List[str] = None,
embedding_size: int = 384, ef_construction: int = 400, M: int = 64, ef: int = 50):
super().__init__(sentence_model, corpus)
self.index = None
if corpus is not None and self.corpus_embeddings.size > 0:
self.build_index(embedding_size, ef_construction, M, ef)
def build_index(self, embedding_size: int = 384, ef_construction: int = 400, M: int = 64, ef: int = 50):
"""Build Hnswlib index after add new documents."""
# Create hnswlib Index
try:
import hnswlib
except ImportError:
raise ImportError("Hnswlib is not installed. Please install it first, e.g. with `pip install hnswlib`.")
# We use Inner Product (dot-product) as Index. We will normalize our vectors to unit length,
# then is Inner Product equal to cosine similarity
self.index = hnswlib.Index(space='cosine', dim=embedding_size)
# Init the HNSWLIB index
logger.info(f"Start creating HNSWLIB index, max_elements: {len(self.corpus)}")
self.index.init_index(max_elements=len(self.corpus_embeddings), ef_construction=ef_construction, M=M)
# Then we train the index to find a suitable clustering
self.index.add_items(self.corpus_embeddings, list(range(len(self.corpus_embeddings))))
# Controlling the recall by setting ef:
self.index.set_ef(ef) # ef should always be > top_k_hits
def save_index(self, index_path: str):
"""Save the annoy index to disk."""
if self.index and index_path:
logger.info(f"Saving index to: {index_path}")
self.index.save_index(index_path)
else:
logger.warning("No index path given. Index not saved.")
def load_index(self, index_path: str):
"""Load Annoy Index from disc."""
if index_path and os.path.exists(index_path):
logger.info(f"Loading index from: {index_path}")
self.index.load_index(index_path)
else:
logger.warning("No index path given. Index not loaded.")
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
query_embeddings = self.get_vector(query)
if not self.index:
logger.warning(f"No index found. Please add corpus and build index first, e.g. with `build_index()`."
f"Now returning slow search result.")
return super().most_similar(query, topn)
# We use hnswlib knn_query method to find the top_k_hits
corpus_ids, distances = self.index.knn_query(query_embeddings, k=topn)
# We extract corpus ids and scores for the first query
hits = [{'corpus_id': id, 'score': 1 - score} for id, score in zip(corpus_ids[0], distances[0])]
hits = sorted(hits, key=lambda x: x['score'], reverse=True)
for hit in hits:
result.append((hit['corpus_id'], self.corpus[hit['corpus_id']], hit['score']))
return result

View File

@ -1,145 +1,357 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 Vit Novotny <witiko@mail.muni.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
"""
@author:XuMing(xuming624@qq.com), Vit Novotny <witiko@mail.muni.cz>, lhy<lhy_in_blcu@126.com>
@description:
Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
This module provides classes that deal with sentence similarities from mean term vector.
Adjust the gensim similarities Index to compute sentence similarities.
"""
This module provides classes that deal with term similarities.
Adjust the Index to compute term similarities.
"""
import math
from loguru import logger
from typing import Dict, List, Tuple, Set, Optional, Union
import numpy as np
import torch
import jieba
import jieba.posseg
from text2vec import Word2Vec
from similarities.similarity import cos_sim, Similarity, semantic_search
import os
from typing import List, Union
import jieba
import jieba.analyse
import jieba.posseg
import numpy as np
from text2vec import Word2Vec
from loguru import logger
from similarities.utils.distance import cosine_distance
from simhash import Simhash
from similarities.utils.distance import sim_hash, hamming_distance
from similarities.utils.rank_bm25 import BM25Okapi
from similarities.utils.tfidf import TFIDF
pwd_path = os.path.abspath(os.path.dirname(__file__))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class WordEmbeddingSimilarity(object):
class SimhashSimilarity:
"""
Computes cosine similarities between word embeddings and retrieves most
similar terms for a given term.
Compute SimHash similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
def __init__(self, keyedvectors: Word2Vec, docs: List[str] = None):
"""
Init WordEmbeddingSimilarity.
:param keyedvectors: ~text2vec.Word2Vec
:param docs: list of str
"""
# super().__init__()
self.keyedvectors = keyedvectors
self.docs = []
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __init__(self, corpus: List[str] = None):
self.corpus = []
self.corpus_embeddings = np.array([])
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of docs."""
return self.docs_embeddings.shape[0]
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
return "%s" % (self.__class__.__name__)
base = f"Similarity: {self.__class__.__name__}, matching_model: Simhash"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_documents(self, docs):
"""Extend the index with new documents.
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
docs : iterable of list of str
corpus : list of str
"""
self.docs += docs
docs_embeddings = self.get_vector(docs)
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
self.corpus += corpus
corpus_embeddings = []
for sentence in corpus:
corpus_embeddings.append(self.simhash(sentence))
if len(corpus_embeddings) % 1000 == 0:
logger.debug(f"Progress, add corpus size: {len(corpus_embeddings)}")
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
self.corpus_embeddings = np.array(corpus_embeddings)
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def simhash(self, text: str):
"""
Compute SimHash for a given text.
:param text: str
:return: hash code
"""
return sim_hash(text)
def _sim_score(self, v1, v2):
"""Compute hamming similarity between two embeddings."""
return (100 - hamming_distance(v1, v2) * 100 / 64) / 100
def similarity(self, text1: str, text2: str):
"""
Compute hamming similarity between two texts.
:param text1:
:param text2:
:return:
"""
v1 = self.simhash(text1)
v2 = self.simhash(text2)
similarity_score = self._sim_score(v1, v2)
return similarity_score
def distance(self, text1: str, text2: str):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query: str, topn: int = 10):
"""
Find the topn most similar texts to the query against the corpus.
:param query: str
:param topn: int
:return: list of tuples (text, similarity)
"""
result = []
query_emb = self.simhash(query)
for (corpus_id, doc), doc_emb in zip(enumerate(self.corpus), self.corpus_embeddings):
score = self._sim_score(query_emb, doc_emb)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class TfidfSimilarity:
"""
Compute TFIDF similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
def __init__(self, corpus: List[str] = None):
super().__init__()
self.corpus = []
self.corpus_embeddings = np.array([])
self.tfidf = TFIDF()
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: Tfidf"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = []
for sentence in corpus:
corpus_embeddings.append(self.tfidf.get_tfidf(sentence))
if len(corpus_embeddings) % 1000 == 0:
logger.debug(f"Progress, add corpus size: {len(corpus_embeddings)}")
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
else:
self.corpus_embeddings = np.array(corpus_embeddings)
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def similarity(self, text1: str, text2: str):
"""
Compute cosine similarity score between two sentences.
:param text1:
:param text2:
:return:
"""
feature1, feature2 = self.tfidf.get_tfidf(text1), self.tfidf.get_tfidf(text2)
return cosine_distance(np.array(feature1), np.array(feature2))
def distance(self, text1: str, text2: str):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
query_emb = self.tfidf.get_tfidf(query)
for (corpus_id, doc), doc_emb in zip(enumerate(self.corpus), self.corpus_embeddings):
score = cosine_distance(query_emb, doc_emb, normalize=True)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class BM25Similarity:
"""
Compute BM25OKapi similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
def __init__(self, corpus: List[str] = None):
super().__init__()
self.corpus = []
self.bm25 = None
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: BM25"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_seg = [jieba.lcut(d) for d in self.corpus]
self.bm25 = BM25Okapi(corpus_seg)
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def similarity(self, text1, text2):
"""
Compute similarity score between two sentences.
:param text1:
:param text2:
:return:
"""
raise NotImplementedError()
def distance(self, text1, text2):
"""Compute distance between two sentences."""
raise NotImplementedError()
def most_similar(self, query, topn=10):
tokens = jieba.lcut(query)
if not self.bm25:
raise ValueError("BM25 model is not initialized. Please add_corpus first, eg. `add_corpus(corpus)`")
scores = self.bm25.get_scores(tokens)
result = [(corpus_id, self.corpus[corpus_id], score) for corpus_id, score in enumerate(scores)]
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class WordEmbeddingSimilarity:
"""
Compute Word2Vec similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
def __init__(self, keyedvectors, corpus: List[str] = None):
"""
Init WordEmbeddingSimilarity.
:param keyedvectors: ~text2vec.Word2Vec
:param corpus: list of str
"""
if isinstance(keyedvectors, Word2Vec):
self.keyedvectors = keyedvectors
elif isinstance(keyedvectors, str):
self.keyedvectors = Word2Vec(keyedvectors)
else:
raise ValueError("keyedvectors must be ~text2vec.Word2Vec or Word2Vec model name")
self.corpus = []
self.corpus_embeddings = np.array([])
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: Word2Vec"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = self.get_vector(corpus)
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def get_vector(self, text):
return self.keyedvectors.encode(text)
def similarity(self, text1, text2, score_function=cos_sim):
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
return score_function(text_emb1, text_emb2)
def similarity(self, text1: str, text2: str):
"""Compute cosine similarity between two texts."""
v1 = self.get_vector(text1)
v2 = self.get_vector(text2)
return cosine_distance(v1, v2)
def distance(self, text1, text2):
"""Compute cosine distance between two keys.
Calculate 1 - :meth:`~gensim.models.keyedvectors.KeyedVectors.similarity`.
Parameters
----------
w1 : str
Input key.
w2 : str
Input key.
Returns
-------
float
Distance between `w1` and `w2`.
"""
def distance(self, text1: str, text2: str):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
def most_similar(self, query: str, topn: int = 10):
"""
Find the topn most similar texts to the query against the corpus.
:param query: str
:param topn: int
:return:
"""
result = []
query_embeddings = self.get_vector(query)
hits = semantic_search(query_embeddings, self.docs_embeddings, top_k=topn)
hits = hits[0] # Get the hits for the first query
print("Input question:", query)
for hit in hits[0:topn]:
result.append((self.docs[hit['corpus_id']], round(hit['score'], 4)))
print("\t{:.3f}\t{}".format(hit['score'], self.docs[hit['corpus_id']]))
print("\n\n========\n")
return result
query_emb = self.get_vector(query)
for (corpus_id, doc), doc_emb in zip(enumerate(self.corpus), self.corpus_embeddings):
score = cosine_distance(query_emb, doc_emb, normalize=True)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class CilinSimilarity(object):
class CilinSimilarity:
"""
Computes cilin similarities between word embeddings and retrieves most
similar terms for a given term.
Compute Cilin similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
default_cilin_path = os.path.join(pwd_path, 'data', 'cilin.txt')
default_cilin_path = os.path.join(pwd_path, 'data/cilin.txt')
def __init__(self, cilin_path: str = default_cilin_path, docs: List[str] = None):
def __init__(self, cilin_path: str = default_cilin_path, corpus: List[str] = None):
super().__init__()
self.cilin_dict = self.load_cilin_dict(cilin_path) # Cilin(词林) semantic dictionary
self.docs = []
if docs is not None:
self.add_documents(docs)
self.corpus = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of index."""
return len(self.docs)
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
return "%s" % (self.__class__.__name__)
base = f"Similarity: {self.__class__.__name__}, matching_model: Cilin"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_documents(self, docs):
"""Extend the index with new documents.
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
docs : iterable of list of str
corpus : list of str
"""
self.docs += docs
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
self.corpus += corpus
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
@staticmethod
def load_cilin_dict(path):
@ -160,7 +372,7 @@ class CilinSimilarity(object):
sem_dict[word] = sem_type.split(';')
return sem_dict
def _compute_word_sim(self, word1, word2):
def _word_sim(self, word1, word2):
"""
比较计算词语之间的相似度取max最大值
:param word1:
@ -169,13 +381,13 @@ class CilinSimilarity(object):
"""
sems_word1 = self.cilin_dict.get(word1, [])
sems_word2 = self.cilin_dict.get(word2, [])
score_list = [self._compute_sem(sem_word1, sem_word2) for sem_word1 in sems_word1 for sem_word2 in sems_word2]
score_list = [self._semantic_sim(sem_word1, sem_word2) for sem_word1 in sems_word1 for sem_word2 in sems_word2]
if score_list:
return max(score_list)
else:
return 0
def _compute_sem(self, sem1, sem2):
def _semantic_sim(self, sem1, sem2):
"""
基于语义计算词语相似度
:param sem1:
@ -195,9 +407,9 @@ class CilinSimilarity(object):
score += 1
return score / 10
def similarity(self, text1, text2):
def similarity(self, text1: str, text2: str):
"""
基于词相似度计算句子相似度
Compute Cilin similarity between two texts.
:param text1:
:param text2:
:return:
@ -207,58 +419,62 @@ class CilinSimilarity(object):
score_words1 = []
score_words2 = []
for word1 in words1:
score = max(self._compute_word_sim(word1, word2) for word2 in words2)
score = max(self._word_sim(word1, word2) for word2 in words2)
score_words1.append(score)
for word2 in words2:
score = max(self._compute_word_sim(word2, word1) for word1 in words1)
score = max(self._word_sim(word2, word1) for word1 in words1)
score_words2.append(score)
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
def distance(self, text1: str, text2: str):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
for doc in self.docs:
for corpus_id, doc in enumerate(self.corpus):
score = self.similarity(query, doc)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class HownetSimilarity(object):
class HownetSimilarity:
"""
Computes hownet similarities between word embeddings and retrieves most
similar terms for a given term.
Compute Hownet similarity between two sentences and retrieves most
similar sentence for a given corpus.
"""
default_hownet_path = os.path.join(pwd_path, 'data', 'hownet.txt')
default_hownet_path = os.path.join(pwd_path, 'data/hownet.txt')
def __init__(self, cilin_path: str = default_hownet_path, docs: List[str] = None):
super().__init__()
self.hownet_dict = self.load_hownet_dict(cilin_path) # semantic dictionary
self.docs = []
if docs is not None:
self.add_documents(docs)
def __init__(self, hownet_path: str = default_hownet_path, corpus: List[str] = None):
self.hownet_dict = self.load_hownet_dict(hownet_path) # semantic dictionary
self.corpus = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of index."""
return len(self.docs)
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
return "%s" % (self.__class__.__name__)
base = f"Similarity: {self.__class__.__name__}, matching_model: Hownet"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_documents(self, docs):
"""Extend the index with new documents.
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
docs : iterable of list of str
corpus : list of str
"""
self.docs += docs
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
self.corpus += corpus
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
@staticmethod
def load_hownet_dict(path):
@ -271,25 +487,25 @@ class HownetSimilarity(object):
hownet_dict[word] = word_def.split(',')
return hownet_dict
def _compute_sem(self, sem1, sem2):
def _semantic_sim(self, sem1, sem2):
"""计算语义相似度"""
sem_inter = set(sem1).intersection(set(sem2))
sem_union = set(sem1).union(set(sem2))
return float(len(sem_inter)) / float(len(sem_union))
def _compute_word_sim(self, word1, word2):
def _word_sim(self, word1, word2):
"""比较两个词语之间的相似度"""
DEFS_word1 = self.hownet_dict.get(word1, [])
DEFS_word2 = self.hownet_dict.get(word2, [])
scores = [self._compute_sem(DEF_word1, DEF_word2) for DEF_word1 in DEFS_word1 for DEF_word2 in DEFS_word2]
sems_word1 = self.hownet_dict.get(word1, [])
sems_words = self.hownet_dict.get(word2, [])
scores = [self._semantic_sim(sem_word1, sem_word2) for sem_word1 in sems_word1 for sem_word2 in sems_words]
if scores:
return max(scores)
else:
return 0
def similarity(self, text1, text2):
def similarity(self, text1: str, text2: str):
"""
基于词相似度计算句子相似度
Computer Hownet similarity between two texts.
:param text1:
:param text2:
:return:
@ -299,217 +515,24 @@ class HownetSimilarity(object):
score_words1 = []
score_words2 = []
for word1 in words1:
score = max(self._compute_word_sim(word1, word2) for word2 in words2)
score = max(self._word_sim(word1, word2) for word2 in words2)
score_words1.append(score)
for word2 in words2:
score = max(self._compute_word_sim(word2, word1) for word1 in words1)
score = max(self._word_sim(word2, word1) for word1 in words1)
score_words2.append(score)
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
def distance(self, text1, text2):
def distance(self, text1: str, text2: str):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
for doc in self.docs:
for corpus_id, doc in enumerate(self.corpus):
score = self.similarity(query, doc)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class SimhashSimilarity(object):
"""
Computes Simhash similarities between word embeddings and retrieves most
similar terms for a given term.
"""
def __init__(self, docs: List[str] = None, hashbits=64):
super().__init__()
self.docs = []
self.hashbits = hashbits
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
docs_embeddings = []
for doc in docs:
doc_emb = self._get_code(doc)
docs_embeddings.append(doc_emb)
if len(docs_embeddings) % 10000 == 0:
logger.debug(f"Progress, add docs size: {len(docs_embeddings)}")
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = np.array(docs_embeddings)
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def _hamming_distance(self, code_s1, code_s2):
"""利用64位数计算海明距离"""
x = (code_s1 ^ code_s2) & ((1 << self.hashbits) - 1)
ans = 0
while x:
ans += 1
x &= x - 1
return ans
def _get_features(self, string):
"""
对全文进行分词,提取全文特征,使用词性将虚词等无关字符去重
:param string:
:return:
"""
word_list = [word.word for word in jieba.posseg.cut(string) if
word.flag[0] not in ['u', 'x', 'w', 'o', 'p', 'c', 'm', 'q']]
return word_list
def _get_code(self, string):
"""对全文进行编码"""
return Simhash(self._get_features(string)).value
def similarity(self, text1, text2):
"""
计算句子间的海明距离
:param text1:
:param text2:
:return:
"""
code_s1 = self._get_code(text1)
code_s2 = self._get_code(text2)
similarity_score = (100 - self._hamming_distance(code_s1, code_s2) * 100 / self.hashbits) / 100
return similarity_score
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_emb = self._get_code(query)
for doc, doc_emb in zip(self.docs, self.docs_embeddings):
score = (100 - self._hamming_distance(query_emb, doc_emb) * 100 / self.hashbits) / 100
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
class TfidfSimilarity(object):
"""
Computes Tfidf similarities between word embeddings and retrieves most
similar texts for a given text.
"""
def __init__(self, docs: List[str] = None):
super().__init__()
self.docs = []
self.docs_embeddings = np.array([])
self.tfidf = TFIDF()
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of index."""
return len(self.docs)
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""Extend the index with new documents.
Parameters
----------
docs : iterable of list of str
"""
self.docs += docs
docs_embeddings = np.array(self.tfidf.get_tfidf(docs))
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def similarity(self, text1, text2):
"""
基于tfidf计算句子间的余弦相似度
:param text1:
:param text2:
:return:
"""
tfidf_features = self.tfidf.get_tfidf([text1, text2])
return cosine_distance(np.array(tfidf_features[0]), np.array(tfidf_features[1]))
def distance(self, text1, text2):
"""Compute cosine distance between two keys."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_emb = np.array(self.tfidf.get_tfidf([query]))
for doc, doc_emb in zip(self.docs, self.docs_embeddings):
score = cosine_distance(query_emb, doc_emb)
result.append((doc, round(score, 4)))
result.sort(key=lambda x: x[1], reverse=True)
return result[:topn]
if __name__ == '__main__':
wm = Word2Vec()
list_of_docs = ["This is a test1", "This is a test2", "This is a test3"]
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6"]
m = WordEmbeddingSimilarity(wm, list_of_docs)
m.add_documents(list_of_docs2)
v = m.get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test1"))
text1 = '周杰伦是一个歌手'
text2 = '刘若英是个演员'
m = CilinSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = HownetSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = SimhashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))
m = TfidfSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_documents(zh_list)
print(m.most_similar('刘若英是演员'))

View File

@ -1,162 +0,0 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
from typing import List
import numpy as np
import torch
from loguru import logger
from text2vec import SentenceModel
from similarities.similarity import cos_sim, semantic_search, dot_score
pwd_path = os.path.abspath(os.path.dirname(__file__))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class BertSimilarity(object):
"""
Computes cosine similarities between word embeddings and retrieves most
similar terms for a given term.
"""
def __init__(self, sentencemodel: SentenceModel, docs: List[str] = None):
# super().__init__()
self.sentencemodel = sentencemodel
self.docs = []
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of docs."""
return self.docs_embeddings.shape[0]
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""
Extend the docs_embeddings with new documents.
Parameters
----------
docs : list of str
"""
self.docs += docs
docs_embeddings = self.get_vector(docs)
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def get_vector(self, text):
return self.sentencemodel.encode(text)
def similarity(self, text1, text2, score_function=cos_sim):
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
return score_function(text_emb1, text_emb2)
def distance(self, text1, text2):
"""Compute cosine distance between two keys.
"""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_embeddings = self.get_vector(query)
hits = semantic_search(query_embeddings, self.docs_embeddings, top_k=topn)
hits = hits[0] # Get the hits for the first query
print("Input question:", query)
for hit in hits[0:topn]:
result.append((self.docs[hit['corpus_id']], round(hit['score'], 4)))
print("\t{:.3f}\t{}".format(hit['score'], self.docs[hit['corpus_id']]))
print("\n\n========\n")
return result
class AnnoySimilarity(object):
"""
Computes cosine similarities between word embeddings and retrieves most
similar terms for a given term.
"""
def __init__(self, sentencemodel: SentenceModel, docs: List[str] = None):
# super().__init__()
self.sentencemodel = sentencemodel
self.docs = []
self.docs_embeddings = np.array([])
if docs is not None:
self.add_documents(docs)
def __len__(self):
"""Get length of docs."""
return self.docs_embeddings.shape[0]
def __str__(self):
return "%s" % (self.__class__.__name__)
def add_documents(self, docs):
"""
Extend the docs_embeddings with new documents.
Parameters
----------
docs : list of str
"""
self.docs += docs
docs_embeddings = self.get_vector(docs)
if self.docs_embeddings.size > 0:
self.docs_embeddings = np.vstack((self.docs_embeddings, docs_embeddings))
else:
self.docs_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(docs)}, total size: {len(self.docs)}")
def get_vector(self, text):
return self.sentencemodel.encode(text)
def similarity(self, text1, text2, score_function=cos_sim):
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
return score_function(text_emb1, text_emb2)
def distance(self, text1, text2):
"""Compute cosine distance between two keys.
"""
return 1 - self.similarity(text1, text2)
def most_similar(self, query, topn=10):
result = []
query_embeddings = self.get_vector(query)
hits = semantic_search(query_embeddings, self.docs_embeddings, top_k=topn)
hits = hits[0] # Get the hits for the first query
print("Input question:", query)
for hit in hits[0:topn]:
result.append((self.docs[hit['corpus_id']], round(hit['score'], 4)))
print("\t{:.3f}\t{}".format(hit['score'], self.docs[hit['corpus_id']]))
print("\n\n========\n")
return result
if __name__ == '__main__':
sm = SentenceModel()
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = BertSimilarity(sm, list_of_docs)
m.add_documents(list_of_docs2)
v = m.get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test1"))
print(m.most_similar("这是个演员"))

View File

@ -5,13 +5,13 @@
"""
import queue
from enum import Enum
from typing import List, Union
import numpy as np
import torch
import torch.nn.functional
from loguru import logger
from text2vec import SentenceModel
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
@ -107,11 +107,14 @@ def semantic_search(
:param query_embeddings: A 2 dimensional tensor with the query embeddings.
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed, but requires more memory.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but
requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed,
but requires more memory.
:param top_k: Retrieve top k matching entries.
:param score_function: Funtion for computing scores. By default, cosine similarity.
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the keys 'corpus_id' and 'score'
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the
keys 'corpus_id' and 'score'
"""
if isinstance(query_embeddings, (np.ndarray, np.generic)):
@ -174,8 +177,10 @@ def paraphrase_mining_embeddings(
other sentences and returns a list with the pairs that have the highest cosine similarity score.
:param embeddings: A tensor with the embeddings
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower memory footprint (increases run-time).
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease, to lower memory footprint (increases run-time).
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower
memory footprint (increases run-time).
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease,
to lower memory footprint (increases run-time).
:param max_pairs: Maximal number of text pairs returned.
:param top_k: For each sentence, we retrieve up to top_k other sentences
:param score_function: Function for computing scores. By default, cosine similarity.
@ -302,146 +307,95 @@ def community_detection(embeddings, threshold=0.75, min_community_size=10, init_
return unique_communities
class EncoderType(Enum):
FIRST_LAST_AVG = 0
LAST_AVG = 1
CLS = 2
POOLER = 3
MEAN = 4
def __str__(self):
return self.name
@staticmethod
def from_string(s):
try:
return EncoderType[s]
except KeyError:
raise ValueError()
class Similarity:
"""
Compute cosine similarity of a dynamic query against a corpus of documents ('the index').
Compute similarity:
1. Compute the similarity between two sentences
2. Retrieves most similar sentence of a query against a corpus of documents.
The index supports adding new documents dynamically.
"""
def __init__(self, model_name_or_path=None, docs=None):
def __init__(self, sentence_model: Union[str, SentenceModel], corpus: List[str] = None):
"""
Parameters
----------
output_prefix : str
Prefix for shard filename. If None, a random filename in temp will be used.
docs : iterable of list of (int, number)
Corpus in streamed Gensim bag-of-words format.
Initialize the similarity object.
:param sentence_model: Model to use for sentence embeddings.
:param corpus: Corpus of documents to use for similarity queries.
"""
self.model_name_or_path = model_name_or_path
self.model = None
logger.debug(f'Loading model {model_name_or_path}')
logger.debug(f"Device: {device}")
self.normalize = True
self.keyedvectors = None
self.docs = docs
self.norm = False
if docs is not None:
self.add_documents(docs)
if isinstance(sentence_model, SentenceModel):
self.sentence_model = sentence_model
elif isinstance(sentence_model, str):
self.sentence_model = SentenceModel(sentence_model)
else:
raise ValueError("sentence_model must be either a SentenceModel or a model name of SentenceTransformer.")
self.corpus = []
self.corpus_embeddings = np.array([])
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of index."""
return self.docs.shape[0]
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
return "%s" % (self.__class__.__name__)
base = f"Similarity: {self.__class__.__name__}, matching_model: {self.sentence_model}"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_documents(self, corpus):
"""Extend the index with new documents.
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : iterable of list of (int, number)
Corpus in BoW format.
corpus : list of str
"""
for doc in corpus:
self.docs.append(doc)
if len(self.docs) % 10000 == 0:
logger.info("PROGRESS: fresh_shard size=%i", len(self.docs))
def get_vector(self, text, norm=False):
"""Get the key's vector, as a 1D numpy array.
Parameters
----------
text : str
Key for vector to return.
norm : bool, optional
If True, the resulting vector will be L2-normalized (unit Euclidean length).
Returns
-------
numpy.ndarray
Vector for the specified key.
Raises
------
KeyError
If the given key doesn't exist.
self.corpus += corpus
docs_embeddings = self.get_vector(corpus)
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, docs_embeddings))
else:
self.corpus_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(corpus)}, total size: {len(self.corpus)}")
def get_vector(self, text: Union[str, List[str]]):
"""
pass
def similarity(
self, text1: Union[List[str], str], text2: Union[List[str], str]
) -> Union[np.ndarray, torch.Tensor]:
Returns the embeddings for a batch of sentences.
:param text:
:return:
"""
Compute similarity between two list of texts.
:param text1: list, sentence1 list
:param text2: list, sentence2 list
:return: return: Matrix with res[i][j] = cos_sim(a[i], b[j])
return self.sentence_model.encode(text)
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]], score_function=cos_sim):
"""
if not text1 or not text2:
return np.array([])
if isinstance(text1, str):
text1 = [text1] # type: ignore
if isinstance(text2, str):
text2 = [text2] # type: ignore
pass
def distance(self, text1: Union[List[str], str], text2: Union[List[str], str]):
"""Compute cosine distance between two keys.
Calculate 1 - :meth:`~gensim.models.keyedvectors.KeyedVectors.similarity`.
Parameters
----------
w1 : str
Input key.
w2 : str
Input key.
Returns
-------
float
Distance between `w1` and `w2`.
Compute similarity between two texts.
:param text1: list of str or str
:param text2: list of str or str
:param score_function: function to compute similarity, default cos_sim
:return: similarity score, torch.Tensor, Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
return score_function(text_emb1, text_emb2)
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(text1, text2)
def most_similar(self, query: Union[List[str], str], topn=10, threshold=0, exponent=2.0):
def most_similar(self, query: str, topn: int = 10):
"""
Get topn similar text
:param query: str, query text
:param top_k: int, top_k
:return: list, top_k similar text
Find the topn most similar texts to the query against the corpus.
:param query: str
:param topn: int
:return:
"""
if query not in self.keyedvectors:
logger.debug('an out-of-dictionary term "%s"', query)
else:
most_similar = self.keyedvectors.most_similar(query, topn=topn)
for t2, similarity in most_similar:
if similarity > threshold:
yield (t2, similarity ** exponent)
result = []
query_embeddings = self.get_vector(query)
hits = semantic_search(query_embeddings, self.corpus_embeddings, top_k=topn)
hits = hits[0] # Get the first query result when query is string
for hit in hits[0:topn]:
result.append((hit['corpus_id'], self.corpus[hit['corpus_id']], hit['score']))
return result

View File

@ -25,6 +25,10 @@ def cosine_distance(v1, v2, normalize=False):
normalize: True, 余弦值的范围是 [-1,+1] 归一化到 [0,1]
return cos score
"""
if isinstance(v1, list):
v1 = np.array(v1)
if isinstance(v2, list):
v2 = np.array(v2)
up = np.dot(v1, v2)
down = np.linalg.norm(v1) * np.linalg.norm(v2)
score = try_divide(up, down)
@ -154,6 +158,33 @@ def string_hash(source):
return str(x)
def sim_hash(text):
import jieba
import jieba.analyse
seg = jieba.cut(text)
key_word = jieba.analyse.extract_tags('|'.join(seg), topK=None, withWeight=True, allowPOS=())
# 先按照权重排序,再按照词排序
key_list = []
for feature, weight in key_word:
weight = int(weight * 20)
temp = []
for f in string_hash(feature):
if f == '1':
temp.append(weight)
else:
temp.append(-weight)
key_list.append(temp)
content_list = np.sum(np.array(key_list), axis=0)
# 编码读不出来
if len(key_list) == 0:
return '00'
hash_code = ''
for c in content_list:
if c > 0:
hash_code = hash_code + '1'
else:
hash_code = hash_code + '0'
return hash_code
def max_min_normalize(x):
"""

View File

@ -62,18 +62,12 @@ class TFIDF:
self.idf_loader.set_new_path(new_abs_path)
self.idf_freq, self.median_idf = self.idf_loader.get_idf()
def get_tfidf(self, sentences):
"""
Extract keywords from sentence using TF-IDF algorithm.
"""
result = []
for sentence in sentences:
words = [word.word for word in jieba.posseg.cut(sentence) if word.flag[0] not in ['u', 'x', 'w']]
words = [word for word in words if word.lower() not in self.stopwords or len(word.strip()) < 2]
word_idf = {word: self.idf_freq.get(word, self.median_idf) for word in words}
def get_tfidf(self, sentence):
words = [word.word for word in jieba.posseg.cut(sentence) if word.flag[0] not in ['u', 'x', 'w']]
words = [word for word in words if word.lower() not in self.stopwords or len(word.strip()) < 2]
word_idf = {word: self.idf_freq.get(word, self.median_idf) for word in words}
freqs = []
for w in list(self.idf_freq.keys()):
freqs.append(word_idf.get(w, 0))
result.append(freqs)
return result
res = []
for w in list(self.idf_freq.keys()):
res.append(word_idf.get(w, 0))
return res

102
tests/test_fastsim.py Normal file
View File

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import sys
import unittest
sys.path.append('..')
from text2vec import SentenceModel
from similarities.similarity import Similarity
from similarities.fastsim import AnnoySimilarity
from similarities.fastsim import HnswlibSimilarity
sm = SentenceModel()
class FastTestCase(unittest.TestCase):
def test_sim_diff(self):
a = '研究团队面向国家重大战略需求追踪国际前沿发展借鉴国际人工智能研究领域的科研模式有效整合创新资源解决复'
b = '英汉互译比较语言学'
m = Similarity(sm)
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
m = HnswlibSimilarity(sm)
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
m = AnnoySimilarity(sm)
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
def test_empty(self):
m = HnswlibSimilarity(sm, embedding_size=384, corpus=[])
v = m.get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
def test_hnsw_score(self):
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = HnswlibSimilarity(sm, embedding_size=384, corpus=list_of_docs)
v = m.get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.add_corpus(list_of_docs2)
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
def test_hnswlib_model_save_load(self):
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = HnswlibSimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.add_corpus(list_of_docs2)
m.build_index()
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.save_index('test.model')
m.load_index('test.model')
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
os.remove('test.model')
def test_annoy_model(self):
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = AnnoySimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.add_corpus(list_of_docs2)
m.build_index()
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
m.save_index('test.model')
m.load_index('test.model')
print(m.most_similar("This is a test4"))
print(m.most_similar("men喜欢这首歌"))
os.remove('test.model')
if __name__ == '__main__':
unittest.main()

103
tests/test_literalsim.py Normal file
View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import sys
import unittest
sys.path.append('..')
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
from text2vec import Word2Vec
class LiteralCase(unittest.TestCase):
def test_simhash(self):
"""test_simhash"""
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = SimhashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
print(m.most_similar('刘若英是演员'))
self.assertEqual(len(m.most_similar('刘若英是演员')), 0)
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
r = m.most_similar('刘若英是演员', topn=2)
print(r)
self.assertAlmostEqual(m.similarity(text1, text2), 0.734375, places=4)
self.assertEqual(len(r), 2)
def test_tfidf(self):
"""test_tfidf"""
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = TfidfSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
self.assertEqual(len(m.most_similar('刘若英是演员')), 4)
def test_bm25(self):
"""test_bm25"""
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = BM25Similarity()
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '我不是演员吗']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
self.assertEqual(len(m.most_similar('刘若英是演员')), 4)
def test_word2vec(self):
"""test_word2vec"""
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
wm = Word2Vec()
list_of_corpus = ["This is a test1", "This is a test2", "This is a test3"]
list_of_corpus2 = ["that is test4", "that is a test5", "that is a test6"]
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌', '刘若英是个演员', '演戏很好看的人']
m = WordEmbeddingSimilarity(wm, list_of_corpus)
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
m.add_corpus(list_of_corpus2+zh_list)
v = m.get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test1"))
print(m.most_similar("刘若英是演员"))
self.assertEqual(len(m.most_similar('刘若英是演员', topn=6)), 6)
def test_cilin(self):
"""test_cilin"""
text1 = '周杰伦是一个歌手'
text2 = '刘若英是个演员'
m = CilinSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
self.assertEqual(len(m.most_similar('刘若英是演员')), 3)
def test_hownet(self):
"""test_cilin"""
text1 = '周杰伦是一个歌手'
text2 = '刘若英是个演员'
m = HownetSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
zh_list = ['刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
m.add_corpus(zh_list)
print(m.most_similar('刘若英是演员'))
self.assertEqual(len(m.most_similar('刘若英是演员')), 3)
if __name__ == '__main__':
unittest.main()

View File

@ -8,10 +8,10 @@ import unittest
sys.path.append('..')
from text2vec import SentenceModel
from similarities.semanticsim import BertSimilarity
from similarities.similarity import Similarity
sm = SentenceModel()
bert_model = BertSimilarity(sm)
bert_model = Similarity(sm)
class IssueTestCase(unittest.TestCase):