update image similarity module.

This commit is contained in:
shibing624 2022-03-08 19:51:28 +08:00
parent e2cb8cab0d
commit 5bd0f05749
34 changed files with 2516 additions and 450 deletions

View File

@ -8,11 +8,11 @@
[![Wechat Group](http://vlog.sfyc.ltd/wechat_everyday/wxgroup_logo.png?imageView2/0/w/60/h/20)](#Contact)
# Similarities
Similarities is a toolkit for similarity calculation and semantic search based on matching model.
Similarities is a toolkit for similarity calculation and semantic search, supports text and image.
similarities相似度计算、语义匹配搜索工具包。
**similarities**基于多种字面、语义匹配模型,实现了各模型的相似度计算、匹配搜索功能python3开发pip安装开箱即用。
**similarities** 实现了多种相似度计算、匹配搜索算法,支持文本、图像python3开发pip安装开箱即用。
**Guide**
@ -29,7 +29,6 @@ similarities相似度计算、语义匹配搜索工具包。
- 余弦相似Cosine Similarity两向量求余弦
- 点积Dot Product两向量归一化后求内积
- 词移距离Word Movers Distance词移距离使用两文本间的词向量测量其中一文本中的单词在语义空间中移动到另一文本单词所需要的最短距离
- [RankBM25](similarities/literalsim.py)BM25的变种算法对query和文档之间的相似度打分得到docs的rank排序
- [SemanticSearch](https://github.com/shibing624/similarities/blob/main/similarities/similarity.py#L99)向量相似检索使用Cosine Similarty + topk高效计算比一对一暴力计算快一个数量级
@ -61,18 +60,15 @@ python3 setup.py install
### 1. 计算两个句子的相似度值
```shell
from similarities import Similarity
m = Similarity("shibing624/text2vec-base-chinese")
r = m.similarity('如何更换花呗绑定银行卡', '花呗更改绑定银行卡')
print(f"{r:.4f}")
>>> from similarities import Similarity
>>> m = Similarity("shibing624/text2vec-base-chinese")
>>> r = m.similarity('如何更换花呗绑定银行卡', '花呗更改绑定银行卡')
>>> print(f"similarity score: {r:.4f}")
similarity score: 0.8551
```
output:
```shell
0.8551
```
> 句子余弦相似度值`score`范围是[-1, 1],值越大越相似。
> 余弦值`score`范围是[-1, 1],值越大越相似。
### 2. 文档集中相似文本搜索
@ -141,7 +137,7 @@ query: 如何更换花呗绑定银行卡
(3, '暴风雨掩埋了东北部新泽西16英寸的降雪', 0.21666759252548218)
(2, '俄罗斯警告乌克兰反对欧盟协议', 0.1450251191854477)
```
> `Score`的值范围[-1, 1]值越大表示该query与corpus的文本越相似。
> 余弦`score`的值范围[-1, 1]值越大表示该query与corpus的文本越相似。
英文示例[examples/base_english_demo.py](./examples/base_english_demo.py)
@ -162,7 +158,7 @@ query: 如何更换花呗绑定银行卡
示例[examples/literal_sim_demo.py](./examples/literal_sim_demo.py)
```python
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, \
from similarities.literalsim import SimHashSimilarity, TfidfSimilarity, BM25Similarity,
WordEmbeddingSimilarity, CilinSimilarity, HownetSimilarity
text1 = "如何更换花呗绑定银行卡"
@ -230,4 +226,6 @@ version = {0.0.4}
# Reference
- [A Simple but Tough-to-Beat Baseline for Sentence Embeddings[Sanjeev Arora and Yingyu Liang and Tengyu Ma, 2017]](https://openreview.net/forum?id=SyK00v5xx)
- [liuhuanyong/SentenceSimilarity](https://github.com/liuhuanyong/SentenceSimilarity)
- [shibing624/text2vec](https://github.com/shibing624/text2vec)
- [shibing624/text2vec](https://github.com/shibing624/text2vec)
- [qwertyforce/image_search](https://github.com/qwertyforce/image_search)
- [ImageHash - Official Github repository](https://github.com/JohannesBuchner/imagehash)

276
docs/clip_similar_search.py Normal file
View File

@ -0,0 +1,276 @@
# -*- coding: utf-8 -*-
"""CLIP_similar_search.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1DeT11AwvxmHhP4xe4q9tHi0YNk9J5cHm
"""
# !pip install ftfy regex tqdm
# !pip install git+https://github.com/openai/CLIP.git
#
# !pip install gdown
# !gdown --id 1IQ90jtnITrrcBWsFjF8jkFXF7LAxDqLF
# Commented out IPython magic to ensure Python compatibility.
# %%time
# import zipfile
# zip_ref = zipfile.ZipFile("archive.zip", 'r')
# zip_ref.extractall("./scenery")
# zip_ref.close()
import matplotlib.pyplot as plt
def show_images(images, figsize=(20, 10), columns=5):
plt.figure(figsize=figsize)
for i, image in enumerate(images):
plt.subplot(len(images) / columns + 1, columns, i + 1)
plt.imshow(image)
import os
IMAGES_PATH = "./scenery"
IMAGES_PATH = '../examples/data/'
file_names = os.listdir(IMAGES_PATH)
print(f"number of images: {len(file_names)}")
import os
import torch
# import clip
from sentence_transformers import SentenceTransformer, util
from os import listdir
from os.path import splitext
import json
from PIL import Image
import pickle as pk
from tqdm import tqdm
import glob
device = "cuda" if torch.cuda.is_available() else "cpu"
# First, we load the CLIP model
model = SentenceTransformer('clip-ViT-B-32')
img_names = list(glob.glob(f'{IMAGES_PATH}/*'))
print("Images:", len(img_names))
def convert_img_mode(img):
if img.mode != 'RGB':
img = img.convert('RGB')
return img
imgs = [Image.open(filepath) for filepath in img_names]
imgs = [convert_img_mode(img) for img in imgs]
img_emb = model.encode(imgs, batch_size=128, convert_to_tensor=True, convert_to_numpy=False, show_progress_bar=True)
print(img_emb.shape)
def search(query, k=3):
# First, we encode the query (which can either be an image or a text string)
query_emb = model.encode([query], convert_to_tensor=True, show_progress_bar=False)
# Then, we use the util.semantic_search function, which computes the cosine-similarity
# between the query embedding and all image embeddings.
# It then returns the top_k highest ranked images, which we output
hits = util.semantic_search(query_emb, img_emb, top_k=k)[0]
print("Query:")
print(query)
for hit in hits:
print(img_names[hit['corpus_id']])
print(os.path.join(IMAGES_PATH, img_names[hit['corpus_id']]), hit['score'])
search("Two dogs playing in the snow")
q_img = convert_img_mode(Image.open(f"{IMAGES_PATH}/image1.jpeg"))
search(q_img)
# model, preprocess = clip.load("ViT-B/32")
# print(device)
# def get_features(image):
# image = preprocess(image).unsqueeze(0).to(device)
# with torch.no_grad():
# image_features = model.encode_image(image)
# image_features /= image_features.norm(dim=-1, keepdim=True)
# return image_features.cpu().numpy()
def generate_clip_features():
all_image_features = []
image_filenames = listdir(IMAGES_PATH)
try:
all_image_features = pk.load(open("clip_image_features.pkl", "rb"))
except (OSError, IOError) as e:
print("file_not_found")
def exists_in_all_image_features(image_id):
for image in all_image_features:
if image['image_id'] == image_id:
# print("skipping "+ str(image_id))
return True
return False
def exists_in_image_folder(image_id):
if image_id in image_filenames:
return True
return False
def sync_clip_image_features():
for_deletion = []
for i in range(len(all_image_features)):
if not exists_in_image_folder(all_image_features[i]['image_id']):
print("deleting " + str(all_image_features[i]['image_id']))
for_deletion.append(i)
for i in reversed(for_deletion):
del all_image_features[i]
sync_clip_image_features()
for image_filename in tqdm(image_filenames):
image_id = splitext(image_filename)[0]
if exists_in_all_image_features(image_id):
continue
image = Image.open(IMAGES_PATH + "/" + image_filename)
image_features = get_features(image)
all_image_features.append({'image_id': image_id, 'features': image_features})
pk.dump(all_image_features, open("clip_image_features.pkl", "wb"))
generate_clip_features()
import numpy as np
from PIL import Image
query_image_pillow = Image.open(f'{IMAGES_PATH}/image1.jpeg')
query_image_features = get_features(query_image_pillow)
show_images([np.array(query_image_pillow)])
print(query_image_features.shape)
from sklearn.neighbors import NearestNeighbors
from os import listdir
import numpy as np
import pickle as pk
import json
from pathlib import Path
image_features = pk.load(open("clip_image_features.pkl", "rb"))
features = []
for image in image_features:
features.append(np.array(image['features']))
features = np.array(features)
features = np.squeeze(features)
# print(features.shape)
# exit()
path = "./scenery"
path = '../examples/data/'
# knn = NearestNeighbors(n_neighbors=20,algorithm='brute',metric='euclidean')
# knn.fit(features)
# file_names=listdir(path)
#
# indices = knn.kneighbors(query_image_features, return_distance=False)
# found_images=[]
# for x in indices[0]:
# found_images.append(np.array(Image.open(path+"/"+file_names[x])))
# show_images(np.array(found_images))
#
# # !pip install hnswlib
#
# import hnswlib
# dim=512
# index = hnswlib.Index(space='l2', dim=dim)
# index.init_index(max_elements=10000, ef_construction=100, M=16)
# index.add_items(features)
#
# # Commented out IPython magic to ensure Python compatibility.
# # %%time
# labels, distances = index.knn_query(query_image_features, k = 20)
#
# images_np_hnsw=[]
# labels=labels[0]
# print(labels)
# for idx in labels:
# images_np_hnsw.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
# show_images(np.array(images_np_hnsw))
#
# width, height = query_image_pillow.size
# query_image_resized=query_image_pillow.resize((width//19, height//19))
# query_image_resized_features=get_features(query_image_resized)
# show_images([np.array(query_image_resized)])
# labels, distances = index.knn_query(query_image_resized_features, k = 20)
# images_np_hnsw_2=[]
# labels=labels[0]
# print(labels)
# for idx in labels:
# images_np_hnsw_2.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
# show_images(np.array(images_np_hnsw_2))
#
# query_image_rotated = query_image_pillow.rotate(180)
# query_image_rotated_features=get_features(query_image_rotated)
# show_images([np.array(query_image_rotated)])
# labels, distances = index.knn_query(query_image_rotated_features, k = 20)
# images_np_hnsw_3=[]
# labels=labels[0]
# print(labels)
# for idx in labels:
# images_np_hnsw_3.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
# show_images(np.array(images_np_hnsw_3))
#
# crop_rectangle = (400, 200, 600, 400)
# query_image_cropped = query_image_pillow.crop(crop_rectangle)
# query_image_cropped_features=get_features(query_image_cropped)
# show_images([np.array(query_image_cropped)])
# labels, distances = index.knn_query(query_image_cropped_features, k = 20)
# images_np_hnsw_4=[]
# labels=labels[0]
# print(labels)
# for idx in labels:
# images_np_hnsw_4.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
# show_images(np.array(images_np_hnsw_4))
#
# text_tokenized = clip.tokenize(["a picture of a windows xp wallpaper"]).to(device)
# with torch.no_grad():
# text_features = model.encode_text(text_tokenized)
# text_features /= text_features.norm(dim=-1, keepdim=True)
#
# # Commented out IPython magic to ensure Python compatibility.
# # %%time
# labels, distances = index.knn_query(text_features.cpu().numpy(), k = 20)
#
# images_np_hnsw_clip_text=[]
# labels=labels[0]
# print(labels)
# for idx in labels:
# images_np_hnsw_clip_text.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
# show_images(np.array(images_np_hnsw_clip_text))
# !pip install git+https://github.com/qwertyforce/Embeddings2Image.git@patch-1
import os
from tqdm import tqdm
from e2i import EmbeddingsProjector
import numpy as np
import h5py
import pickle as pk
data_path = 'data.hdf5'
output_path = 'output_plot'
full_file_names = list(map(lambda el: IMAGES_PATH + "/" + el, file_names))
with h5py.File(data_path, 'w') as hf:
hf.create_dataset('urls', data=np.asarray(full_file_names).astype("S"))
hf.create_dataset('vectors', data=features)
hf.close()
image = EmbeddingsProjector()
image.path2data = data_path
image.load_data()
image.each_img_size = 100
image.output_img_size = 10000
image.calculate_projection()
image.output_img_name = output_path
image.output_img_type = 'scatter'
image.create_image()
print(image.image_list)
print('done!')

94
docs/phash.py Normal file
View File

@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description: refer: https://github.com/qwertyforce/image_search
"""
# !pip install ImageHash
# !pip install distance
# !pip install vptree
import os
import matplotlib.pyplot as plt
def show_images(images, figsize=(20, 10), columns=5):
plt.figure(figsize=figsize)
for i, image in enumerate(images):
plt.subplot(len(images) / columns + 1, columns, i + 1)
plt.imshow(image)
# plt.show()
from PIL import Image
import imagehash
import numpy as np
import distance
IMAGE_PATH = '../examples/data/'
hashes = {}
file_names = os.listdir(IMAGE_PATH)
for file_name in file_names:
phash = str(imagehash.phash(Image.open(f'{IMAGE_PATH}/{file_name}'), 16))
if phash in hashes:
hashes[phash].append(file_name)
else:
hashes[phash] = [file_name]
print(hashes)
query_image = Image.open(f'{IMAGE_PATH}/image1.jpeg')
query_image_phash = str(imagehash.phash(query_image, 16))
show_images([np.array(query_image)])
hamming_distances = []
for phash in hashes.keys():
hamming_distances.append({"dist": distance.hamming(query_image_phash, phash), "phash": phash})
hamming_distances.sort(key=lambda item: item["dist"])
hamming_distances = hamming_distances[:10]
print(hamming_distances)
found_images = []
for it in hamming_distances:
found_images.append(hashes[it["phash"]])
found_images = [item for sublist in found_images for item in sublist]
print('found_images:',found_images)
images_np = []
for image_filename in found_images:
images_np.append(np.array(Image.open(f'{IMAGE_PATH}/{image_filename}')))
import vptree
tree = vptree.VPTree(list(hashes.keys()), distance.hamming)
neighbors = tree.get_n_nearest_neighbors(query_image_phash, 10)
print(neighbors)
vptree_found_images = []
for neighbor in neighbors:
vptree_found_images.append(hashes[neighbor[1]])
vptree_found_images = [item for sublist in vptree_found_images for item in sublist]
print('vptree_found_images:',vptree_found_images)
images_np_vptree = []
for image_filename in vptree_found_images:
images_np_vptree.append(np.array(Image.open(f'{IMAGE_PATH}/{image_filename}')))
show_images(images_np_vptree)
width, height = query_image.size
query_image_resized = query_image.resize((width // 19, height // 19))
print(distance.hamming(query_image_phash, str(imagehash.phash(query_image_resized, 16))))
show_images([np.array(query_image_resized)])
query_image_resized_2 = query_image.resize((width // 4, height // 23))
print(distance.hamming(query_image_phash, str(imagehash.phash(query_image_resized_2, 16))))
show_images([np.array(query_image_resized_2)])
crop_rectangle = (200, 200, 900, 900)
query_image_cropped = query_image.crop(crop_rectangle)
print(distance.hamming(query_image_phash, str(imagehash.phash(query_image_cropped, 16))))
show_images([np.array(query_image_cropped)])
query_image_rotated = query_image.rotate(180)
print(distance.hamming(query_image_phash, str(imagehash.phash(query_image_rotated, 16))))
show_images([np.array(query_image_rotated)])

View File

@ -0,0 +1,222 @@
# -*- coding: utf-8 -*-
"""ResNet50_similar_search.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1cASnOmR8wUtK4rRoiQJ0NrEiGy1unuMr
"""
# !pip install gdown
# !gdown --id 1IQ90jtnITrrcBWsFjF8jkFXF7LAxDqLF
# Commented out IPython magic to ensure Python compatibility.
# %%time
# import zipfile
# zip_ref = zipfile.ZipFile("archive.zip", 'r')
# zip_ref.extractall("./scenery")
# zip_ref.close()
import matplotlib.pyplot as plt
def show_images(images, figsize=(20,10), columns = 5):
plt.figure(figsize=figsize)
for i, image in enumerate(images):
plt.subplot(len(images) / columns + 1, columns, i + 1)
plt.imshow(image)
import os
IMAGES_PATH="./scenery"
IMAGES_PATH = '../examples/data/'
file_names=os.listdir(IMAGES_PATH)
print(f"number of images: {len(file_names)}")
from keras.applications.resnet50 import ResNet50
from keras.applications.resnet50 import preprocess_input
import os
from os import listdir
from os.path import splitext
import numpy as np
from PIL import Image
import pickle as pk
from tqdm import tqdm
def read_img_file(f):
img = Image.open(f)
if img.mode != 'RGB':
img = img.convert('RGB')
return img
def resize_img_to_array(img, img_shape):
img_array = np.array(
img.resize(
img_shape,
Image.ANTIALIAS
)
)
return img_array
def get_features(img):
img_width, img_height = 224, 224
np_img = resize_img_to_array(img, img_shape=(img_width, img_height))
expanded_img_array = np.expand_dims(np_img, axis=0)
preprocessed_img = preprocess_input(expanded_img_array)
X_conv = model.predict(preprocessed_img)
image_features=X_conv[0]
image_features /= np.linalg.norm(image_features)
return image_features
model = ResNet50(weights='imagenet', include_top=False,input_shape=(224, 224, 3),pooling='max')
def generate_resnet_features():
all_image_features=[]
image_filenames=listdir(IMAGES_PATH)
image_ids=set(map(lambda el: splitext(el)[0],image_filenames))
try:
all_image_features=pk.load(open("resnet_image_features.pkl", "rb"))
except (OSError, IOError) as e:
print("file_not_found")
def exists_in_all_image_features(image_id):
for image in all_image_features:
if image['image_id'] == image_id:
# print("skipping "+ str(image_id))
return True
return False
def exists_in_image_folder(image_id):
if image_id in image_ids:
return True
return False
def sync_resnet_image_features():
for_deletion=[]
for i in range(len(all_image_features)):
if not exists_in_image_folder(all_image_features[i]['image_id']):
print("deleting "+ str(all_image_features[i]['image_id']))
for_deletion.append(i)
for i in reversed(for_deletion):
del all_image_features[i]
sync_resnet_image_features()
for image_filename in tqdm(image_filenames):
image_id=splitext(image_filename)[0]
if exists_in_all_image_features(image_id):
continue
img_arr = read_img_file(IMAGES_PATH+"/"+image_filename)
image_features=get_features(img_arr)
# print(image_filename)
# print(image_features)
all_image_features.append({'image_id':image_id,'features':image_features})
pk.dump(all_image_features, open("resnet_image_features.pkl","wb"))
generate_resnet_features()
import numpy as np
from PIL import Image
query_image_pillow=Image.open(f'{IMAGES_PATH}/00000061_(6).jpg').convert('RGB')
query_image_features=get_features(query_image_pillow)
show_images([np.array(query_image_pillow)])
print(query_image_features.shape)
from sklearn.neighbors import NearestNeighbors
from os import listdir
import pickle as pk
image_features=pk.load( open("resnet_image_features.pkl", "rb"))
features=[]
for image in image_features:
features.append(np.array(image['features']))
features=np.array(features)
features=np.squeeze(features)
path="./scenery"
knn = NearestNeighbors(n_neighbors=20,algorithm='brute',metric='euclidean')
knn.fit(features)
file_names=listdir(path)
indices = knn.kneighbors([query_image_features], return_distance=False)
found_images=[]
for x in indices[0]:
found_images.append(np.array(Image.open(path+"/"+file_names[x])))
show_images(np.array(found_images))
# !pip install hnswlib
import hnswlib
dim=2048
index = hnswlib.Index(space='l2', dim=dim)
index.init_index(max_elements=10000, ef_construction=100, M=16)
index.add_items(features)
# Commented out IPython magic to ensure Python compatibility.
# %%time
labels, distances = index.knn_query([query_image_features], k = 20)
images_np_hnsw=[]
labels=labels[0]
print(labels)
for idx in labels:
images_np_hnsw.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_hnsw))
width, height = query_image_pillow.size
query_image_resized=query_image_pillow.resize((width//19, height//19))
query_image_resized_features=get_features(query_image_resized)
show_images([np.array(query_image_resized)])
labels, distances = index.knn_query([query_image_resized_features], k = 20)
images_np_hnsw_2=[]
labels=labels[0]
print(labels)
for idx in labels:
images_np_hnsw_2.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_hnsw_2))
query_image_rotated = query_image_pillow.rotate(180)
query_image_rotated_features=get_features(query_image_rotated)
show_images([np.array(query_image_rotated)])
labels, distances = index.knn_query([query_image_rotated_features], k = 20)
images_np_hnsw_3=[]
labels=labels[0]
print(labels)
for idx in labels:
images_np_hnsw_3.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_hnsw_3))
crop_rectangle = (400, 200, 600, 400)
query_image_cropped = query_image_pillow.crop(crop_rectangle)
query_image_cropped_features=get_features(query_image_cropped)
show_images([np.array(query_image_cropped)])
labels, distances = index.knn_query([query_image_cropped_features], k = 20)
images_np_hnsw_4=[]
labels=labels[0]
print(labels)
for idx in labels:
images_np_hnsw_4.append(np.array(Image.open(f'{IMAGES_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_hnsw_4))
# !pip install git+https://github.com/qwertyforce/Embeddings2Image.git@patch-1
import os
from tqdm import tqdm
from e2i import EmbeddingsProjector
import numpy as np
import h5py
import pickle as pk
data_path = 'data.hdf5'
output_path = 'output_plot'
full_file_names=list(map(lambda el: IMAGES_PATH+"/"+el,file_names))
with h5py.File(data_path, 'w') as hf:
hf.create_dataset('urls', data=np.asarray(full_file_names).astype("S"))
hf.create_dataset('vectors', data=features)
hf.close()
image = EmbeddingsProjector()
image.path2data = data_path
image.load_data()
image.each_img_size = 100
image.output_img_size = 10000
image.calculate_projection()
image.output_img_name = output_path
image.output_img_type = 'scatter'
image.create_image()
print(image.image_list)
print('done!')

133
docs/rgb_histograms.py Normal file
View File

@ -0,0 +1,133 @@
# -*- coding: utf-8 -*-
"""rgb_histograms.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1ZKRUlq54Wwt3nQNNrLOd-mb6MGZ2QT8i
"""
# !pip install gdown
#
# !gdown --id 1IQ90jtnITrrcBWsFjF8jkFXF7LAxDqLF
# Commented out IPython magic to ensure Python compatibility.
# %%time
# import zipfile
# zip_ref = zipfile.ZipFile("archive.zip", 'r')
# zip_ref.extractall("./scenery")
# zip_ref.close()
import matplotlib.pyplot as plt
def show_images(images, figsize=(20,10), columns = 5):
plt.figure(figsize=figsize)
for i, image in enumerate(images):
plt.subplot(len(images) / columns + 1, columns, i + 1)
plt.imshow(image)
import os
IMAGE_PATH="./scenery"
IMAGE_PATH = '../examples/data/'
file_names=os.listdir(IMAGE_PATH)
print(f"number of images: {len(file_names)}")
import numpy as np
import cv2
from PIL import Image
query_image_pillow=Image.open(f'{IMAGE_PATH}/image1.jpeg')
width, height = query_image_pillow.size
print(width, height)
query_image=cv2.imread(f'{IMAGE_PATH}/image2.jpeg')
query_hist_combined=cv2.calcHist([query_image],[0,1,2],None,[16,16,16],[0,256,0,256,0,256])
query_hist_combined = cv2.normalize(query_hist_combined, query_hist_combined).flatten()
print(query_hist_combined.shape)
show_images([np.array(query_image_pillow)])
file_names=os.listdir(IMAGE_PATH)
hists=[]
for file_name in file_names:
img=cv2.imread(f'{IMAGE_PATH}/{file_name}')
hist_combined=cv2.calcHist([img],[0,1,2],None,[16,16,16],[0,256,0,256,0,256])
hist_combined = cv2.normalize(hist_combined, hist_combined).flatten()
hists.append({"hist":hist_combined,"file_name":file_name})
# Commented out IPython magic to ensure Python compatibility.
# %%time
found_images=[]
for hist in hists:
similarity=cv2.compareHist(query_hist_combined,hist["hist"],cv2.HISTCMP_INTERSECT)
found_images.append({"similarity":similarity,"file_name":hist["file_name"]})
found_images.sort(key=lambda item: item["similarity"],reverse=True)
found_images=found_images[:10]
print(found_images)
images_np=[]
found_images_filenames=list(map(lambda el: el["file_name"],found_images))
for image_filename in found_images_filenames:
images_np.append(np.array(Image.open(f'{IMAGE_PATH}/{image_filename}')))
show_images(np.array(images_np))
from sklearn.neighbors import NearestNeighbors
hists_list=list(map(lambda el: el['hist'],hists))
knn = NearestNeighbors(n_neighbors=10,algorithm='brute',metric='euclidean')
knn.fit(hists_list)
# Commented out IPython magic to ensure Python compatibility.
# %%time
distances,indices= knn.kneighbors([query_hist_combined], return_distance=True)
indices=indices[0]
images_np_knn=[]
for idx in indices:
images_np_knn.append(np.array(Image.open(f'{IMAGE_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_knn))
# !pip install hnswlib
import hnswlib
dim=4096
index = hnswlib.Index(space='l2', dim=4096)
index.init_index(max_elements=10000, ef_construction=100, M=16)
data = np.array(hists_list)
index.add_items(data)
# Commented out IPython magic to ensure Python compatibility.
# %%time
labels, distances = index.knn_query(query_hist_combined, k = 10)
images_np_hnsw=[]
labels=labels[0]
print(labels)
for idx in labels:
images_np_hnsw.append(np.array(Image.open(f'{IMAGE_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_hnsw))
query_image_resized=np.array(query_image_pillow.resize((width//19, height//19)))
show_images([np.array(query_image_resized)])
query_image_resized = cv2.cvtColor(query_image_resized, cv2.COLOR_RGB2BGR)
query_hist_resized=cv2.calcHist([query_image_resized],[0,1,2],None,[16,16,16],[0,256,0,256,0,256])
query_hist_resized = cv2.normalize(query_hist_resized, query_hist_resized).flatten()
print(cv2.compareHist(query_hist_combined,query_hist_resized,cv2.HISTCMP_INTERSECT))
query_image_resized_2=np.array(query_image_pillow.resize((width//4, height//23)))
show_images([np.array(query_image_resized_2)])
query_image_resized_2 = cv2.cvtColor(query_image_resized_2, cv2.COLOR_RGB2BGR)
query_hist_resized_2=cv2.calcHist([query_image_resized_2],[0,1,2],None,[16,16,16],[0,256,0,256,0,256])
query_hist_resized_2 = cv2.normalize(query_hist_resized_2, query_hist_resized_2).flatten()
print(cv2.compareHist(query_hist_combined,query_hist_resized_2,cv2.HISTCMP_INTERSECT))
crop_rectangle = (150, 150, 600, 600)
query_image_cropped = np.array(query_image_pillow.crop(crop_rectangle))
show_images([np.array(query_image_cropped)])
query_image_cropped = cv2.cvtColor(query_image_cropped, cv2.COLOR_RGB2BGR)
query_hist_cropped=cv2.calcHist([query_image_cropped],[0,1,2],None,[16,16,16],[0,256,0,256,0,256])
query_hist_cropped = cv2.normalize(query_hist_cropped, query_hist_cropped).flatten()
print(cv2.compareHist(query_hist_combined,query_hist_cropped,cv2.HISTCMP_INTERSECT))
distances,indices= knn.kneighbors([query_hist_cropped], return_distance=True)
indices=indices[0]
images_np_knn=[]
for idx in indices:
images_np_knn.append(np.array(Image.open(f'{IMAGE_PATH}/{file_names[idx]}')))
show_images(np.array(images_np_knn))

142
docs/sift.py Normal file
View File

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
"""sift.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1leOzG-AQw5MkzgA4qNW5fb3yc-oJ4Lo4
"""
# !pip install opencv-python -U
#
# !pip install gdown
# !gdown --id 1IQ90jtnITrrcBWsFjF8jkFXF7LAxDqLF
# Commented out IPython magic to ensure Python compatibility.
# %%time
# import zipfile
# zip_ref = zipfile.ZipFile("archive.zip", 'r')
# zip_ref.extractall("./scenery")
# zip_ref.close()
import matplotlib.pyplot as plt
def show_images(images, figsize=(20, 10), columns=5):
plt.figure(figsize=figsize)
for i, image in enumerate(images):
plt.subplot(len(images) / columns + 1, columns, i + 1)
plt.imshow(image)
import os
IMAGE_PATH = "./scenery"
IMAGE_PATH = '../examples/data/'
file_names = os.listdir(IMAGE_PATH)
print(f"number of images: {len(file_names)}")
import cv2
import numpy as np
from PIL import Image
from os import listdir
import pickle as pk
import math
from tqdm import tqdm
sift = cv2.SIFT_create(nfeatures=500)
def resize_img_to_array(img):
height, width = img.size
if height * width > 2000 * 2000:
k = math.sqrt(height * width / (2000 * 2000))
img = img.resize(
(round(height / k), round(width / k)),
Image.ANTIALIAS
)
img_array = np.array(img)
return img_array
def calculate_descr(img):
eps = 1e-7
img = resize_img_to_array(img)
key_points, descriptors = sift.detectAndCompute(img, None)
if descriptors is None:
return (None, None)
descriptors /= (descriptors.sum(axis=1, keepdims=True) + eps) # RootSift
descriptors = np.sqrt(descriptors) # RootSift
return (key_points, descriptors)
IMAGES_PATH = '../examples/data/'
file_names = listdir(IMAGES_PATH)
all_image_features = []
for file_name in tqdm(file_names):
img = Image.open(IMAGES_PATH + "/" + file_name)
keyp, descs = calculate_descr(img)
if descs is None:
continue
if descs.shape[0] == 1:
continue
all_image_features.append({"descs": descs, "file_name": file_name})
# pk.dump(all_image_features, open("all_image_features.pkl","wb"))
print('all_image_features:', all_image_features)
import numpy as np
from PIL import Image
query_image_pillow = Image.open(f'{IMAGES_PATH}/image1.jpeg')
query_image_features = calculate_descr(query_image_pillow)[1]
print(query_image_features.shape)
bf = cv2.BFMatcher()
def match_descriptors(IMAGE_SIMILARITIES, filename, matches):
good_matches = []
good_matches_sum = 0
for m, n in matches:
if m.distance < 0.75 * n.distance:
good_matches.append(m)
good_matches_sum += m.distance
if len(good_matches) < 5:
return
bestN = 5
topBestNSum = 0
good_matches.sort(key=lambda match: match.distance)
for match in good_matches[:bestN]:
topBestNSum += match.distance
IMAGE_SIMILARITIES.append(
{"id": filename, "distance": (topBestNSum / bestN) * good_matches_sum / (len(good_matches))})
def sift_reverse_search(image_file):
IMAGE_SIMILARITIES = []
_, target_descriptors = calculate_descr(image_file)
for image in all_image_features:
matches = bf.knnMatch(target_descriptors, image["descs"], k=2)
match_descriptors(IMAGE_SIMILARITIES, image["file_name"], matches)
IMAGE_SIMILARITIES.sort(key=lambda image: image["distance"])
print('IMAGE_SIMILARITIES', IMAGE_SIMILARITIES[:10])
return list(map(lambda el: el["id"], IMAGE_SIMILARITIES[:10]))
# Commented out IPython magic to ensure Python compatibility.
# %%time
res = sift_reverse_search(query_image_pillow)
print(res)
found_images = []
for file_name in res:
found_images.append(np.array(Image.open(IMAGES_PATH + "/" + file_name)))
show_images(np.array(found_images))
crop_rectangle = (100, 100, 400, 400)
query_image_cropped = query_image_pillow.crop(crop_rectangle)
res2 = sift_reverse_search(query_image_cropped)
show_images([np.array(query_image_cropped)])
found_images_2 = []
for file_name in res2:
found_images_2.append(np.array(Image.open(IMAGES_PATH + "/" + file_name)))
show_images(np.array(found_images_2))

View File

@ -9,13 +9,8 @@ import sys
sys.path.append('..')
from similarities import Similarity
from loguru import logger
logger.remove()
logger.add(sys.stderr, level="INFO")
if __name__ == '__main__':
model = Similarity("shibing624/text2vec-base-chinese")
# 1.Compute cosine similarity between two sentences.
sentences = ['如何更换花呗绑定银行卡',
'花呗更改绑定银行卡']
@ -27,6 +22,8 @@ if __name__ == '__main__':
'中央情报局局长访问以色列叙利亚会谈',
'人在巴基斯坦基地的炸弹袭击中丧生',
]
model = Similarity("shibing624/text2vec-base-chinese")
print(model)
similarity_score = model.similarity(sentences[0], sentences[1])
print(f"{sentences[0]} vs {sentences[1]}, score: {float(similarity_score):.4f}")
@ -38,9 +35,9 @@ if __name__ == '__main__':
print(f"{sentences[i]} vs {corpus[j]}, score: {similarity_scores.numpy()[i][j]:.4f}")
# 3.Semantic Search
m = Similarity(sentence_model="shibing624/text2vec-base-chinese", corpus=corpus)
model.add_corpus(corpus)
q = '如何更换花呗绑定银行卡'
print(m.most_similar(q, topn=5))
print(model.most_similar(q, topn=5))
print("query:", q)
for i in m.most_similar(q, topn=5):
for i in model.most_similar(q, topn=5):
print('\t', i)

BIN
examples/data/image1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 145 KiB

BIN
examples/data/image10.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 121 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 148 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 483 KiB

BIN
examples/data/image3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

BIN
examples/data/image5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 596 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 454 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

View File

@ -20,7 +20,7 @@ def hnswlib():
m = HnswlibSimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
@ -44,7 +44,7 @@ def annoy():
m = AnnoySimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))

62
examples/image_demo.py Normal file
View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import sys
import glob
sys.path.append('..')
from similarities.imagesim import ImageHashSimilarity, SiftSimilarity, ClipSimilarity
def phash_demo(image_fp1, image_fp2):
m = ImageHashSimilarity(hash_function='phash')
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob('data/*.jpg') + glob.glob('data/*.png'))
r = m.most_similar(image_fp1)
print(r)
m = ImageHashSimilarity(hash_function='average_hash')
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob('data/*.jpg') + glob.glob('data/*.png'))
r = m.most_similar(image_fp1)
print(r)
def sift_demo(image_fp1, image_fp2):
m = SiftSimilarity()
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob('data/*.jpg'))
m.add_corpus(glob.glob('data/*.png'))
r = m.most_similar(image_fp1)
print(r)
def clip_demo(image_fp1, image_fp2):
m = ClipSimilarity()
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob('data/*.jpg') + glob.glob('data/*.png'))
r = m.most_similar(image_fp1)
print(r)
if __name__ == "__main__":
image_fp1 = 'data/image1.png'
image_fp2 = 'data/image12-like-image1.png'
phash_demo(image_fp1, image_fp2)
sift_demo(image_fp1, image_fp2)
clip_demo(image_fp1, image_fp2)

View File

@ -3,20 +3,23 @@
@author:XuMing(xuming624@qq.com)
@description:
"""
import os
import sys
from text2vec import Word2Vec
from loguru import logger
sys.path.append('..')
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
from similarities.literalsim import SimHashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
logger.remove()
logger.add(sys.stderr, level="INFO")
def main():
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = SimhashSimilarity()
m = SimHashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
print(m.most_similar('刘若英是演员'))
@ -43,7 +46,7 @@ def main():
list_of_corpus2 = ["that is test4", "that is a test5", "that is a test6"]
m = WordEmbeddingSimilarity(wm, list_of_corpus)
m.add_corpus(list_of_corpus2)
v = m.get_vector("This is a test1")
v = m._get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))

View File

@ -1,8 +1,9 @@
sentence-transformers>=2.1.0
text2vec
jieba>=0.39
loguru
transformers>=4.6.0
scikit-learn
gensim>=4.0.0
text2vec
hnswlib
#annoy
opencv-python

View File

@ -9,7 +9,7 @@ This package contains implementations of pairwise similarity queries.
# bring classes directly into package namespace, to save some typing
from similarities.version import __version__
from similarities.similarity import Similarity
from similarities.similarity import (
from similarities.utils import (
cos_sim,
dot_score,
semantic_search,
@ -20,10 +20,15 @@ from similarities.similarity import (
from similarities.fastsim import AnnoySimilarity, HnswlibSimilarity
from similarities.literalsim import (
SimhashSimilarity,
SimHashSimilarity,
TfidfSimilarity,
BM25Similarity,
WordEmbeddingSimilarity,
CilinSimilarity,
HownetSimilarity
)
from similarities.imagesim import (
ImageHashSimilarity,
ClipSimilarity,
SiftSimilarity
)

View File

@ -15,11 +15,11 @@ class AnnoySimilarity(Similarity):
similar query for a given docs with Annoy.
"""
def __init__(self, sentence_model, corpus: List[str] = None,
def __init__(self, model_name_or_path="shibing624/text2vec-base-chinese", corpus: List[str] = None,
embedding_size: int = 384, n_trees: int = 256):
super().__init__(sentence_model, corpus)
super().__init__(model_name_or_path, corpus)
self.index = None
if corpus is not None and self.corpus_embeddings.size > 0:
if corpus is not None and self.corpus_embeddings:
self.build_index(embedding_size, n_trees)
def build_index(self, embedding_size: int = 384, n_trees: int = 256):
@ -29,12 +29,15 @@ class AnnoySimilarity(Similarity):
from annoy import AnnoyIndex
except ImportError:
raise ImportError("Annoy is not installed. Please install it first, e.g. with `pip install annoy`.")
self.index = AnnoyIndex(embedding_size, 'angular')
# Creating the annoy index
self.index = AnnoyIndex(embedding_size, 'angular')
logger.info(f"Init annoy index, embedding_size: {embedding_size}")
logger.info(f"Building index with {n_trees} trees.")
for i in range(len(self.corpus_embeddings)):
self.index.add_item(i, self.corpus_embeddings[i])
logger.info(f"Create Annoy index with {n_trees} trees. This can take some time.")
self.index.build(n_trees)
def save_index(self, index_path: str):
@ -56,11 +59,16 @@ class AnnoySimilarity(Similarity):
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
query_embeddings = self.get_vector(query)
if not self.index:
query_embeddings = self._get_vector(query)
if self.corpus_embeddings and self.index is None:
logger.warning(f"No index found. Please add corpus and build index first, e.g. with `build_index()`."
f"Now returning slow search result.")
return super().most_similar(query, topn)
if not self.corpus_embeddings:
logger.error("No corpus_embeddings found. Please add corpus first, e.g. with `add_corpus()`.")
return result
corpus_ids, scores = self.index.get_nns_by_vector(query_embeddings, topn, include_distances=True)
for id, score in zip(corpus_ids, scores):
score = 1 - ((score ** 2) / 2)
@ -75,11 +83,11 @@ class HnswlibSimilarity(Similarity):
similar query for a given docs with Hnswlib.
"""
def __init__(self, sentence_model, corpus: List[str] = None,
def __init__(self, model_name_or_path="shibing624/text2vec-base-chinese", corpus: List[str] = None,
embedding_size: int = 384, ef_construction: int = 400, M: int = 64, ef: int = 50):
super().__init__(sentence_model, corpus)
super().__init__(model_name_or_path, corpus)
self.index = None
if corpus is not None and self.corpus_embeddings.size > 0:
if corpus is not None and self.corpus_embeddings:
self.build_index(embedding_size, ef_construction, M, ef)
def build_index(self, embedding_size: int = 384, ef_construction: int = 400, M: int = 64, ef: int = 50):
@ -89,11 +97,16 @@ class HnswlibSimilarity(Similarity):
import hnswlib
except ImportError:
raise ImportError("Hnswlib is not installed. Please install it first, e.g. with `pip install hnswlib`.")
# We use Inner Product (dot-product) as Index. We will normalize our vectors to unit length,
# then is Inner Product equal to cosine similarity
self.index = hnswlib.Index(space='cosine', dim=embedding_size)
# Init the HNSWLIB index
logger.info(f"Start creating HNSWLIB index, max_elements: {len(self.corpus)}")
logger.info(f"Parameters Required: M: {M}")
logger.info(f"Parameters Required: ef_construction: {ef_construction}")
logger.info(f"Parameters Required: ef(>topn): {ef}")
self.index.init_index(max_elements=len(self.corpus_embeddings), ef_construction=ef_construction, M=M)
# Then we train the index to find a suitable clustering
self.index.add_items(self.corpus_embeddings, list(range(len(self.corpus_embeddings))))
@ -119,15 +132,20 @@ class HnswlibSimilarity(Similarity):
def most_similar(self, query: str, topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
result = []
query_embeddings = self.get_vector(query)
if not self.index:
query_embeddings = self._get_vector(query)
if self.corpus_embeddings and self.index is None:
logger.warning(f"No index found. Please add corpus and build index first, e.g. with `build_index()`."
f"Now returning slow search result.")
return super().most_similar(query, topn)
if not self.corpus_embeddings:
logger.error("No corpus_embeddings found. Please add corpus first, e.g. with `add_corpus()`.")
return result
# We use hnswlib knn_query method to find the top_k_hits
corpus_ids, distances = self.index.knn_query(query_embeddings, k=topn)
# We extract corpus ids and scores for the first query
hits = [{'corpus_id': id, 'score': 1 - score} for id, score in zip(corpus_ids[0], distances[0])]
hits = [{'corpus_id': id, 'score': 1 - distance} for id, distance in zip(corpus_ids[0], distances[0])]
hits = sorted(hits, key=lambda x: x['score'], reverse=True)
for hit in hits:
result.append((hit['corpus_id'], self.corpus[hit['corpus_id']], hit['score']))

338
similarities/imagesim.py Normal file
View File

@ -0,0 +1,338 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description: Image similarity and image retrieval
refer: https://colab.research.google.com/drive/1leOzG-AQw5MkzgA4qNW5fb3yc-oJ4Lo4
Adjust the code to compare similarity score and search.
"""
import math
import os
from typing import List, Union
import cv2
import numpy as np
from PIL import Image
from loguru import logger
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
from similarities.similarity import semantic_search
from similarities.utils.distance import hamming_distance
from similarities.utils.imagehash import phash, dhash, whash, average_hash
from similarities.utils.util import cos_sim
pwd_path = os.path.abspath(os.path.dirname(__file__))
class ImageHashSimilarity:
"""
Compute Phash similarity between two images and retrieves most
similar image for a given image corpus.
perceptual hash (pHash), which acts as an image fingerprint.
"""
def __init__(self, corpus: List[str] = None, hash_function: str = "phash", hash_size: int = 16):
self.corpus = []
self.hash_functions = {'phash': phash, 'dhash': dhash, 'whash': whash, 'average_hash': average_hash}
if hash_function not in self.hash_functions:
raise ValueError(f"hash_function: {hash_function} must be one of {self.hash_functions.keys()}")
self.hash_function = self.hash_functions[hash_function]
self.hash_size = hash_size
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: {self.hash_function.__name__}"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = []
for doc_fp in tqdm(corpus, desc="Calculating corpus image hash"):
doc_seq = str(self.hash_function(Image.open(doc_fp), self.hash_size))
corpus_embeddings.append(doc_seq)
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def _sim_score(self, seq1, seq2):
"""Compute hamming similarity between two seqs."""
return 1.0 - hamming_distance(seq1, seq2) / len(seq1)
def similarity(self, fp1: str, fp2: str):
"""
Compute similarity between two image files.
:param fp1: image file path 1
:param fp2: image file path 2
:return: similarity score
"""
img1 = Image.open(fp1)
img2 = Image.open(fp2)
seq1 = str(self.hash_function(img1, self.hash_size))
seq2 = str(self.hash_function(img2, self.hash_size))
similarity_score = self._sim_score(seq1, seq2)
return similarity_score
def distance(self, fp1: str, fp2: str):
"""Compute distance between two image files."""
return 1 - self.similarity(fp1, fp2)
def most_similar(self, query_fp: str, topn: int = 10):
"""
Find the topn most similar images to the query against the corpus.
:param query_fp: str
:param topn: int
:return: list of tuples (id, image_path, similarity)
"""
result = []
q_seq = str(self.hash_function(Image.open(query_fp), self.hash_size))
for (corpus_id, doc), doc_seq in zip(enumerate(self.corpus), self.corpus_embeddings):
score = self._sim_score(q_seq, doc_seq)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class SiftSimilarity:
"""
Compute SIFT similarity between two images and retrieves most
similar image for a given image corpus.
SIFT, Scale Invariant Feature Transform(SIFT) 尺度不变特征变换匹配算法详解
https://blog.csdn.net/zddblog/article/details/7521424
"""
def __init__(self, corpus: List[str] = None, nfeatures: int = 500):
self.corpus = []
self.sift = cv2.SIFT_create(nfeatures=nfeatures)
self.bf_matcher = cv2.BFMatcher() # Brute-force matcher create method.
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: SIFT"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = []
for doc_fp in tqdm(corpus, desc="Calculating corpus image SIFT"):
img = Image.open(doc_fp)
_, descriptors = self.calculate_descr(img)
if len(descriptors.shape) > 0 and descriptors.shape[0] > 0:
corpus_embeddings.append(descriptors.tolist())
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
@staticmethod
def _resize_img_to_array(img, max_height=2000, max_width=2000):
"""Resize image to array."""
height, width = img.size
if height * width > max_height * max_width:
k = math.sqrt(height * width / (max_height * max_width))
img = img.resize(
(round(height / k), round(width / k)),
Image.ANTIALIAS
)
img_array = np.array(img)
return img_array
def calculate_descr(self, img, min_value=1e-7):
"""Calculate SIFT descriptors."""
img = self._resize_img_to_array(img)
key_points, descriptors = self.sift.detectAndCompute(img, None)
if descriptors is None:
return None, None
descriptors /= (descriptors.sum(axis=1, keepdims=True) + min_value) # RootSift
descriptors = np.sqrt(descriptors)
return key_points, descriptors
def _sim_score(self, desc1, desc2):
"""Compute similarity between two descs."""
if isinstance(desc1, list):
desc1 = np.array(desc1, dtype=np.float32)
if isinstance(desc2, list):
desc2 = np.array(desc2, dtype=np.float32)
score = 0.0
matches = self.bf_matcher.knnMatch(desc1, desc2, k=2)
good_matches = []
good_matches_sum = 0
for m, n in matches:
if m.distance < 0.75 * n.distance:
good_matches.append(m)
good_matches_sum += m.distance
if len(good_matches) < 5:
return score
bestN = 5
topBestNSum = 0
good_matches.sort(key=lambda match: match.distance)
for match in good_matches[:bestN]:
topBestNSum += match.distance
score = (topBestNSum / bestN) * good_matches_sum / len(good_matches)
return score
def similarity(self, fp1: str, fp2: str):
"""
Compute similarity between two image files.
:param fp1: image file path 1
:param fp2: image file path 2
:return: similarity score
"""
similarity_score = 0.0
_, desc1 = self.calculate_descr(Image.open(fp1))
_, desc2 = self.calculate_descr(Image.open(fp2))
if desc1.size > 0 and desc2.size > 0:
similarity_score = self._sim_score(desc1, desc2)
return similarity_score
def distance(self, fp1: str, fp2: str):
"""Compute distance between two keys."""
return 1 - self.similarity(fp1, fp2)
def most_similar(self, query_fp: str, topn: int = 10):
"""
Find the topn most similar images to the query against the corpus.
:param query_fp: str
:param topn: int
:return: list of tuples (id, image_path, similarity)
"""
result = []
_, q_desc = self.calculate_descr(Image.open(query_fp))
for (corpus_id, doc), doc_desc in zip(enumerate(self.corpus), self.corpus_embeddings):
score = self._sim_score(q_desc, doc_desc)
result.append((corpus_id, doc, score))
result.sort(key=lambda x: x[2], reverse=True)
return result[:topn]
class ClipSimilarity:
"""
Compute CLIP similarity between two images and retrieves most
similar image for a given image corpus.
CLIP: https://github.com/openai/CLIP.git
"""
def __init__(self, corpus: List[str] = None, model_name_or_path: str = 'clip-ViT-B-32'):
self.corpus = []
self.clip_model = SentenceTransformer(model_name_or_path) # load the CLIP model
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: CLIP"
if self.corpus:
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: List[str]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = self._get_vector(corpus).tolist()
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def _convert_to_rgb(self, img):
"""Convert image to RGB mode."""
if img.mode != 'RGB':
img = img.convert('RGB')
return img
def _get_vector(self, img_paths: Union[str, List[str]]):
"""
Returns the embeddings for a batch of images.
:param img_paths:
:return:
"""
if isinstance(img_paths, str):
img_paths = [img_paths]
imgs = [Image.open(filepath) for filepath in img_paths]
imgs = [self._convert_to_rgb(img) for img in imgs]
return self.clip_model.encode(imgs, batch_size=128, convert_to_tensor=False, show_progress_bar=True)
def similarity(self, fp1: str, fp2: str):
"""
Compute similarity between two image files.
:param fp1: image file path 1
:param fp2: image file path 2
:return: similarity score
"""
emb1 = self._get_vector(fp1)
emb2 = self._get_vector(fp2)
similarity_score = float(cos_sim(emb1, emb2))
return similarity_score
def distance(self, fp1: str, fp2: str):
"""Compute distance between two image files."""
return 1 - self.similarity(fp1, fp2)
def most_similar(self, query_fp: str, topn: int = 10):
"""
Find the topn most similar images to the query against the corpus.
:param query_fp: str
:param topn: int
:return: list of tuples (id, image_path, similarity)
"""
result = []
q_emb = self._get_vector(query_fp)
# Computes the cosine-similarity between the query embedding and all image embeddings.
hits = semantic_search(q_emb, np.array(self.corpus_embeddings, dtype=np.float32), top_k=topn)
hits = hits[0] # Get the first query result when query is string
for hit in hits[:topn]:
result.append((hit['corpus_id'], self.corpus[hit['corpus_id']], hit['score']))
return result[:topn]

View File

@ -10,22 +10,21 @@ Adjust the gensim similarities Index to compute sentence similarities.
import os
from typing import List, Union
from tqdm import tqdm
import jieba
import jieba.analyse
import jieba.posseg
import numpy as np
from text2vec import Word2Vec
from loguru import logger
from similarities.utils.distance import cosine_distance
from similarities.utils.distance import sim_hash, hamming_distance
from similarities.utils.distance import string_hash, hamming_distance, cosine_distance
from similarities.utils.rank_bm25 import BM25Okapi
from similarities.utils.tfidf import TFIDF
pwd_path = os.path.abspath(os.path.dirname(__file__))
class SimhashSimilarity:
class SimHashSimilarity:
"""
Compute SimHash similarity between two sentences and retrieves most
similar sentence for a given corpus.
@ -33,7 +32,7 @@ class SimhashSimilarity:
def __init__(self, corpus: List[str] = None):
self.corpus = []
self.corpus_embeddings = np.array([])
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
@ -57,14 +56,12 @@ class SimhashSimilarity:
"""
self.corpus += corpus
corpus_embeddings = []
for sentence in corpus:
for sentence in tqdm(corpus, desc="Computing corpus SimHash"):
corpus_embeddings.append(self.simhash(sentence))
if len(corpus_embeddings) % 1000 == 0:
logger.debug(f"Progress, add corpus size: {len(corpus_embeddings)}")
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = np.array(corpus_embeddings)
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def simhash(self, text: str):
@ -73,11 +70,38 @@ class SimhashSimilarity:
:param text: str
:return: hash code
"""
return sim_hash(text)
seg = jieba.cut(text)
key_word = jieba.analyse.extract_tags('|'.join(seg), topK=None, withWeight=True, allowPOS=())
# 先按照权重排序,再按照词排序
key_list = []
for feature, weight in key_word:
weight = int(weight * 20)
temp = []
for f in string_hash(feature):
if f == '1':
temp.append(weight)
else:
temp.append(-weight)
key_list.append(temp)
content_list = np.sum(np.array(key_list), axis=0)
# 编码读不出来
if len(key_list) == 0:
return '00'
hash_code = ''
for c in content_list:
if c > 0:
hash_code = hash_code + '1'
else:
hash_code = hash_code + '0'
return hash_code
def _sim_score(self, v1, v2):
"""Compute hamming similarity between two embeddings."""
return (100 - hamming_distance(v1, v2) * 100 / 64) / 100
def _sim_score(self, seq1, seq2):
"""Convert hamming distance to similarity score."""
# 将距离转化为相似度
score = 0.0
if len(seq1) > 2 and len(seq2) > 2:
score = 1.0 - hamming_distance(seq1, seq2) / len(seq1)
return score
def similarity(self, text1: str, text2: str):
"""
@ -86,9 +110,9 @@ class SimhashSimilarity:
:param text2:
:return:
"""
v1 = self.simhash(text1)
v2 = self.simhash(text2)
similarity_score = self._sim_score(v1, v2)
seq1 = self.simhash(text1)
seq2 = self.simhash(text2)
similarity_score = self._sim_score(seq1, seq2)
return similarity_score
@ -121,7 +145,7 @@ class TfidfSimilarity:
def __init__(self, corpus: List[str] = None):
super().__init__()
self.corpus = []
self.corpus_embeddings = np.array([])
self.corpus_embeddings = []
self.tfidf = TFIDF()
if corpus is not None:
self.add_corpus(corpus)
@ -146,14 +170,12 @@ class TfidfSimilarity:
"""
self.corpus += corpus
corpus_embeddings = []
for sentence in corpus:
for sentence in tqdm(corpus, desc="Computing corpus TFIDF"):
corpus_embeddings.append(self.tfidf.get_tfidf(sentence))
if len(corpus_embeddings) % 1000 == 0:
logger.debug(f"Progress, add corpus size: {len(corpus_embeddings)}")
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = np.array(corpus_embeddings)
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def similarity(self, text1: str, text2: str):
@ -217,7 +239,7 @@ class BM25Similarity:
self.bm25 = BM25Okapi(corpus_seg)
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def similarity(self, text1, text2):
def _similarity(self, text1, text2):
"""
Compute similarity score between two sentences.
:param text1:
@ -226,7 +248,7 @@ class BM25Similarity:
"""
raise NotImplementedError()
def distance(self, text1, text2):
def _distance(self, text1, text2):
"""Compute distance between two sentences."""
raise NotImplementedError()
@ -259,7 +281,7 @@ class WordEmbeddingSimilarity:
else:
raise ValueError("keyedvectors must be ~text2vec.Word2Vec or Word2Vec model name")
self.corpus = []
self.corpus_embeddings = np.array([])
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
@ -282,20 +304,20 @@ class WordEmbeddingSimilarity:
corpus : list of str
"""
self.corpus += corpus
corpus_embeddings = self.get_vector(corpus)
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, corpus_embeddings))
corpus_embeddings = self._get_vector(corpus).tolist()
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add corpus size: {len(corpus)}, total size: {len(self.corpus)}")
def get_vector(self, text):
def _get_vector(self, text):
return self.keyedvectors.encode(text)
def similarity(self, text1: str, text2: str):
"""Compute cosine similarity between two texts."""
v1 = self.get_vector(text1)
v2 = self.get_vector(text2)
v1 = self._get_vector(text1)
v2 = self._get_vector(text2)
return cosine_distance(v1, v2)
def distance(self, text1: str, text2: str):
@ -310,7 +332,7 @@ class WordEmbeddingSimilarity:
:return:
"""
result = []
query_emb = self.get_vector(query)
query_emb = self._get_vector(query)
for (corpus_id, doc), doc_emb in zip(enumerate(self.corpus), self.corpus_embeddings):
score = cosine_distance(query_emb, doc_emb, normalize=True)
result.append((corpus_id, doc, score))

View File

@ -1,310 +1,19 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
@description:
Compute similarity:
1. Compute the similarity between two sentences
2. Retrieves most similar sentence of a query against a corpus of documents.
"""
import queue
from typing import List, Union
import numpy as np
import torch
import torch.nn.functional
from loguru import logger
from text2vec import SentenceModel
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def cos_sim(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j.
:return: Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
if len(a.shape) == 1:
a = a.unsqueeze(0)
if len(b.shape) == 1:
b = b.unsqueeze(0)
a_norm = normalize_embeddings(a)
b_norm = normalize_embeddings(b)
return torch.mm(a_norm, b_norm.transpose(0, 1))
def dot_score(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the dot-product dot_prod(a[i], b[j]) for all i and j.
:return: Matrix with res[i][j] = dot_prod(a[i], b[j])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
if len(a.shape) == 1:
a = a.unsqueeze(0)
if len(b.shape) == 1:
b = b.unsqueeze(0)
return torch.mm(a, b.transpose(0, 1))
def pairwise_dot_score(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the pairwise dot-product dot_prod(a[i], b[i])
:return: Vector with res[i] = dot_prod(a[i], b[i])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
return (a * b).sum(dim=-1)
def pairwise_cos_sim(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the pairwise cossim cos_sim(a[i], b[i])
:return: Vector with res[i] = cos_sim(a[i], b[i])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
return pairwise_dot_score(normalize_embeddings(a), normalize_embeddings(b))
def normalize_embeddings(embeddings: torch.Tensor):
"""
Normalizes the embeddings matrix, so that each sentence embedding has unit length
"""
return torch.nn.functional.normalize(embeddings, p=2, dim=1)
def semantic_search(
query_embeddings: Union[torch.Tensor, np.ndarray],
corpus_embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 100,
corpus_chunk_size: int = 500000,
top_k: int = 10,
score_function=cos_sim
):
"""
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings.
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries.
:param query_embeddings: A 2 dimensional tensor with the query embeddings.
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but
requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed,
but requires more memory.
:param top_k: Retrieve top k matching entries.
:param score_function: Funtion for computing scores. By default, cosine similarity.
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the
keys 'corpus_id' and 'score'
"""
if isinstance(query_embeddings, (np.ndarray, np.generic)):
query_embeddings = torch.from_numpy(query_embeddings)
elif isinstance(query_embeddings, list):
query_embeddings = torch.stack(query_embeddings)
if len(query_embeddings.shape) == 1:
query_embeddings = query_embeddings.unsqueeze(0)
if isinstance(corpus_embeddings, (np.ndarray, np.generic)):
corpus_embeddings = torch.from_numpy(corpus_embeddings)
elif isinstance(corpus_embeddings, list):
corpus_embeddings = torch.stack(corpus_embeddings)
# Check that corpus and queries are on the same device
query_embeddings = query_embeddings.to(device)
corpus_embeddings = corpus_embeddings.to(device)
queries_result_list = [[] for _ in range(len(query_embeddings))]
for query_start_idx in range(0, len(query_embeddings), query_chunk_size):
# Iterate over chunks of the corpus
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size):
# Compute cosine similarity
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx + query_chunk_size],
corpus_embeddings[corpus_start_idx:corpus_start_idx + corpus_chunk_size])
# Get top-k scores
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])),
dim=1, largest=True, sorted=False)
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist()
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist()
for query_itr in range(len(cos_scores)):
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr],
cos_scores_top_k_values[query_itr]):
corpus_id = corpus_start_idx + sub_corpus_id
query_id = query_start_idx + query_itr
queries_result_list[query_id].append({'corpus_id': corpus_id, 'score': score})
# Sort and strip to top_k results
for idx in range(len(queries_result_list)):
queries_result_list[idx] = sorted(queries_result_list[idx], key=lambda x: x['score'], reverse=True)
queries_result_list[idx] = queries_result_list[idx][0:top_k]
return queries_result_list
def paraphrase_mining_embeddings(
embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 5000,
corpus_chunk_size: int = 100000,
max_pairs: int = 500000,
top_k: int = 100,
score_function=cos_sim
):
"""
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all
other sentences and returns a list with the pairs that have the highest cosine similarity score.
:param embeddings: A tensor with the embeddings
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower
memory footprint (increases run-time).
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease,
to lower memory footprint (increases run-time).
:param max_pairs: Maximal number of text pairs returned.
:param top_k: For each sentence, we retrieve up to top_k other sentences
:param score_function: Function for computing scores. By default, cosine similarity.
:return: Returns a list of triplets with the format [score, id1, id2]
"""
if isinstance(embeddings, (np.ndarray, np.generic)):
embeddings = torch.from_numpy(embeddings)
elif isinstance(embeddings, list):
embeddings = torch.stack(embeddings)
if len(embeddings.shape) == 1:
embeddings = embeddings.unsqueeze(0)
embeddings = embeddings.to(device)
top_k += 1 # A sentence has the highest similarity to itself. Increase +1 as we are interest in distinct pairs
# Mine for duplicates
pairs = queue.PriorityQueue()
min_score = -1
num_added = 0
for corpus_start_idx in range(0, len(embeddings), corpus_chunk_size):
for query_start_idx in range(0, len(embeddings), query_chunk_size):
scores = score_function(embeddings[query_start_idx: query_start_idx + query_chunk_size],
embeddings[corpus_start_idx: corpus_start_idx + corpus_chunk_size])
scores_top_k_values, scores_top_k_idx = torch.topk(scores, min(top_k, len(scores[0])), dim=1, largest=True,
sorted=False)
scores_top_k_values = scores_top_k_values.cpu().tolist()
scores_top_k_idx = scores_top_k_idx.cpu().tolist()
for query_itr in range(len(scores)):
for top_k_idx, corpus_itr in enumerate(scores_top_k_idx[query_itr]):
i = query_start_idx + query_itr
j = corpus_start_idx + corpus_itr
if i != j and scores_top_k_values[query_itr][top_k_idx] > min_score:
pairs.put((scores_top_k_values[query_itr][top_k_idx], i, j))
num_added += 1
if num_added >= max_pairs:
entry = pairs.get()
min_score = entry[0]
# Get the pairs
added_pairs = set() # Used for duplicate detection
pairs_list = []
while not pairs.empty():
score, i, j = pairs.get()
sorted_i, sorted_j = sorted([i, j])
if sorted_i != sorted_j and (sorted_i, sorted_j) not in added_pairs:
added_pairs.add((sorted_i, sorted_j))
pairs_list.append([score, i, j])
# Highest scores first
pairs_list = sorted(pairs_list, key=lambda x: x[0], reverse=True)
return pairs_list
def community_detection(embeddings, threshold=0.75, min_community_size=10, init_max_size=1000):
"""
Function for Fast Community Detection
Finds in the embeddings all communities, i.e. embeddings that are close (closer than threshold).
Returns only communities that are larger than min_community_size. The communities are returned
in decreasing order. The first element in each list is the central point in the community.
"""
# Maximum size for community
init_max_size = min(init_max_size, len(embeddings))
# Compute cosine similarity scores
cos_scores = cos_sim(embeddings, embeddings)
# Minimum size for a community
top_k_values, _ = cos_scores.topk(k=min_community_size, largest=True)
# Filter for rows >= min_threshold
extracted_communities = []
for i in range(len(top_k_values)):
if top_k_values[i][-1] >= threshold:
new_cluster = []
# Only check top k most similar entries
top_val_large, top_idx_large = cos_scores[i].topk(k=init_max_size, largest=True)
top_idx_large = top_idx_large.tolist()
top_val_large = top_val_large.tolist()
if top_val_large[-1] < threshold:
for idx, val in zip(top_idx_large, top_val_large):
if val < threshold:
break
new_cluster.append(idx)
else:
# Iterate over all entries (slow)
for idx, val in enumerate(cos_scores[i].tolist()):
if val >= threshold:
new_cluster.append(idx)
extracted_communities.append(new_cluster)
# Largest cluster first
extracted_communities = sorted(extracted_communities, key=lambda x: len(x), reverse=True)
# Step 2) Remove overlapping communities
unique_communities = []
extracted_ids = set()
for community in extracted_communities:
add_cluster = True
for idx in community:
if idx in extracted_ids:
add_cluster = False
break
if add_cluster:
unique_communities.append(community)
for idx in community:
extracted_ids.add(idx)
return unique_communities
from sentence_transformers import SentenceTransformer
from similarities.utils.util import cos_sim, semantic_search, dot_score
class Similarity:
@ -316,20 +25,21 @@ class Similarity:
The index supports adding new documents dynamically.
"""
def __init__(self, sentence_model: Union[str, SentenceModel], corpus: List[str] = None):
def __init__(self, model_name_or_path="shibing624/text2vec-base-chinese", corpus: List[str] = None):
"""
Initialize the similarity object.
:param sentence_model: Model to use for sentence embeddings.
:param model_name_or_path: The name of the model or the path to the matching model.
:param corpus: Corpus of documents to use for similarity queries.
"""
if isinstance(sentence_model, SentenceModel):
self.sentence_model = sentence_model
elif isinstance(sentence_model, str):
self.sentence_model = SentenceModel(sentence_model)
if isinstance(model_name_or_path, str):
self.sentence_model = SentenceTransformer(model_name_or_path)
elif hasattr(model_name_or_path, "encode"):
self.sentence_model = model_name_or_path
else:
raise ValueError("sentence_model must be either a SentenceModel or a model name of SentenceTransformer.")
raise ValueError("model_name_or_path is model name of SentenceTransformer or transformers")
self.score_functions = {'cos_sim': cos_sim, 'dot': dot_score}
self.corpus = []
self.corpus_embeddings = np.array([])
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
@ -352,14 +62,14 @@ class Similarity:
corpus : list of str
"""
self.corpus += corpus
docs_embeddings = self.get_vector(corpus)
if self.corpus_embeddings.size > 0:
self.corpus_embeddings = np.vstack((self.corpus_embeddings, docs_embeddings))
docs_embeddings = self._get_vector(corpus).tolist()
if self.corpus_embeddings:
self.corpus_embeddings += docs_embeddings
else:
self.corpus_embeddings = docs_embeddings
logger.info(f"Add docs size: {len(corpus)}, total size: {len(self.corpus)}")
def get_vector(self, text: Union[str, List[str]]):
def _get_vector(self, text: Union[str, List[str]]) -> np.ndarray:
"""
Returns the embeddings for a batch of sentences.
:param text:
@ -367,7 +77,7 @@ class Similarity:
"""
return self.sentence_model.encode(text)
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]], score_function=cos_sim):
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]], score_function: str = "cos_sim"):
"""
Compute similarity between two texts.
:param text1: list of str or str
@ -375,8 +85,12 @@ class Similarity:
:param score_function: function to compute similarity, default cos_sim
:return: similarity score, torch.Tensor, Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
text_emb1 = self.get_vector(text1)
text_emb2 = self.get_vector(text2)
if score_function not in self.score_functions:
raise ValueError(f"score function: {score_function} must be either (cos_sim) for cosine similarity"
" or (dot) for dot product")
score_function = self.score_functions[score_function]
text_emb1 = self._get_vector(text1)
text_emb2 = self._get_vector(text2)
return score_function(text_emb1, text_emb2)
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
@ -391,8 +105,8 @@ class Similarity:
:return:
"""
result = []
query_embeddings = self.get_vector(query)
hits = semantic_search(query_embeddings, self.corpus_embeddings, top_k=topn)
query_embeddings = self._get_vector(query)
hits = semantic_search(query_embeddings, np.array(self.corpus_embeddings, dtype=np.float32), top_k=topn)
hits = hits[0] # Get the first query result when query is string
for hit in hits[0:topn]:

View File

@ -3,4 +3,11 @@
@author:XuMing(xuming624@qq.com)
@description:
"""
from .util import *
from .distance import *
from .get_file import *
from .imagehash import *
from .ngram_util import *
from .rank_bm25 import *
from .tfidf import *
from .tokenizer import *

View File

@ -21,14 +21,14 @@ def try_divide(x, y, val=0.0):
def cosine_distance(v1, v2, normalize=False):
"""
余弦距离
normalize: True, 余弦值的范围是 [-1,+1] 归一化到 [0,1]
Compute the cosine distance between two vectors.
normalize: False is [-1, +1], True is [0, 1]
return cos score
"""
if isinstance(v1, list):
v1 = np.array(v1)
v1 = np.array(v1, dtype=np.float32)
if isinstance(v2, list):
v2 = np.array(v2)
v2 = np.array(v2, dtype=np.float32)
up = np.dot(v1, v2)
down = np.linalg.norm(v1) * np.linalg.norm(v2)
score = try_divide(up, down)
@ -37,9 +37,29 @@ def cosine_distance(v1, v2, normalize=False):
return score
def hamming_distance(v1, v2): # 海明距离
n = int(v1, 2) ^ int(v2, 2)
return bin(n & 0xffffffff).count('1')
def hamming_distance(seq1, seq2, normalize=False):
"""Compute the Hamming distance between the two sequences `seq1` and `seq2`.
The Hamming distance is the number of differing items in two ordered
sequences of the same length. If the sequences submitted do not have the
same length, an error will be raised.
If `normalized` is `False`, the return value will be an integer
between 0 and the length of the sequences provided, edge values included;
otherwise, it will be a float between 0 and 1 included, where 0 means
equal, and 1 totally different. Normalized hamming distance is computed as:
0.0 if len(seq1) == 0
hamming_dist / len(seq1) otherwise
"""
L = len(seq1)
if L != len(seq2):
raise ValueError("expected two strings of the same length")
if L == 0:
return 0.0 if normalize else 0 # equal
dist = sum(c1 != c2 for c1, c2 in zip(seq1, seq2))
if normalize:
return dist / float(L)
return dist
def euclidean_distance(v1, v2, normalize=False): # 欧氏距离
@ -158,33 +178,6 @@ def string_hash(source):
return str(x)
def sim_hash(text):
import jieba
import jieba.analyse
seg = jieba.cut(text)
key_word = jieba.analyse.extract_tags('|'.join(seg), topK=None, withWeight=True, allowPOS=())
# 先按照权重排序,再按照词排序
key_list = []
for feature, weight in key_word:
weight = int(weight * 20)
temp = []
for f in string_hash(feature):
if f == '1':
temp.append(weight)
else:
temp.append(-weight)
key_list.append(temp)
content_list = np.sum(np.array(key_list), axis=0)
# 编码读不出来
if len(key_list) == 0:
return '00'
hash_code = ''
for c in content_list:
if c > 0:
hash_code = hash_code + '1'
else:
hash_code = hash_code + '0'
return hash_code
def max_min_normalize(x):
"""

View File

@ -0,0 +1,595 @@
# -*- coding: utf-8 -*-
"""
Image hashing library
======================
Example:
>>> from PIL import Image
>>> import imagehash
>>> hash = imagehash.average_hash(Image.open('test.png'))
>>> print(hash)
d879f8f89b1bbf
>>> otherhash = imagehash.average_hash(Image.open('other.bmp'))
>>> print(otherhash)
ffff3720200ffff
>>> print(hash == otherhash)
False
>>> print(hash - otherhash)
36
>>> for r in range(1, 30, 5):
... rothash = imagehash.average_hash(Image.open('test.png').rotate(r))
... print('Rotation by %d: %d Hamming difference' % (r, hash - rothash))
...
Rotation by 1: 2 Hamming difference
Rotation by 6: 11 Hamming difference
Rotation by 11: 13 Hamming difference
Rotation by 16: 17 Hamming difference
Rotation by 21: 19 Hamming difference
Rotation by 26: 21 Hamming difference
>>>
"""
from __future__ import (absolute_import, division, print_function)
from PIL import Image, ImageFilter
import numpy
__version__ = "4.2.1"
"""
You may copy this file, if you keep the copyright information below:
Copyright (c) 2013-2020, Johannes Buchner
https://github.com/JohannesBuchner/imagehash
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
def _binary_array_to_hex(arr):
"""
internal function to make a hex string out of a binary array.
"""
bit_string = ''.join(str(b) for b in 1 * arr.flatten())
width = int(numpy.ceil(len(bit_string) / 4))
return '{:0>{width}x}'.format(int(bit_string, 2), width=width)
class ImageHash(object):
"""
Hash encapsulation. Can be used for dictionary keys and comparisons.
"""
def __init__(self, binary_array):
self.hash = binary_array
def __str__(self):
return _binary_array_to_hex(self.hash.flatten())
def __repr__(self):
return repr(self.hash)
def __sub__(self, other):
if other is None:
raise TypeError('Other hash must not be None.')
if self.hash.size != other.hash.size:
raise TypeError('ImageHashes must be of the same shape.', self.hash.shape, other.hash.shape)
return numpy.count_nonzero(self.hash.flatten() != other.hash.flatten())
def __eq__(self, other):
if other is None:
return False
return numpy.array_equal(self.hash.flatten(), other.hash.flatten())
def __ne__(self, other):
if other is None:
return False
return not numpy.array_equal(self.hash.flatten(), other.hash.flatten())
def __hash__(self):
# this returns a 8 bit integer, intentionally shortening the information
return sum([2 ** (i % 8) for i, v in enumerate(self.hash.flatten()) if v])
def __len__(self):
# Returns the bit length of the hash
return self.hash.size
def hex_to_hash(hexstr):
"""
Convert a stored hash (hex, as retrieved from str(Imagehash))
back to a Imagehash object.
Notes:
1. This algorithm assumes all hashes are either
bidimensional arrays with dimensions hash_size * hash_size,
or onedimensional arrays with dimensions binbits * 14.
2. This algorithm does not work for hash_size < 2.
"""
hash_size = int(numpy.sqrt(len(hexstr) * 4))
# assert hash_size == numpy.sqrt(len(hexstr)*4)
binary_array = '{:0>{width}b}'.format(int(hexstr, 16), width=hash_size * hash_size)
bit_rows = [binary_array[i:i + hash_size] for i in range(0, len(binary_array), hash_size)]
hash_array = numpy.array([[bool(int(d)) for d in row] for row in bit_rows])
return ImageHash(hash_array)
def hex_to_flathash(hexstr, hashsize):
hash_size = int(len(hexstr) * 4 / (hashsize))
binary_array = '{:0>{width}b}'.format(int(hexstr, 16), width=hash_size * hashsize)
hash_array = numpy.array([[bool(int(d)) for d in binary_array]])[-hash_size * hashsize:]
return ImageHash(hash_array)
def old_hex_to_hash(hexstr, hash_size=8):
"""
Convert a stored hash (hex, as retrieved from str(Imagehash))
back to a Imagehash object. This method should be used for
hashes generated by ImageHash up to version 3.7. For hashes
generated by newer versions of ImageHash, hex_to_hash should
be used instead.
"""
l = []
count = hash_size * (hash_size // 4)
if len(hexstr) != count:
emsg = 'Expected hex string size of {}.'
raise ValueError(emsg.format(count))
for i in range(count // 2):
h = hexstr[i * 2:i * 2 + 2]
v = int("0x" + h, 16)
l.append([v & 2 ** i > 0 for i in range(8)])
return ImageHash(numpy.array(l))
def average_hash(image, hash_size=8, mean=numpy.mean):
"""
Average Hash computation
Implementation follows http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
Step by step explanation: https://web.archive.org/web/20171112054354/https://www.safaribooksonline.com/blog/2013/11/26/image-hashing-with-python/
@image must be a PIL instance.
@mean how to determine the average luminescence. can try numpy.median instead.
"""
if hash_size < 2:
raise ValueError("Hash size must be greater than or equal to 2")
# reduce size and complexity, then covert to grayscale
image = image.convert("L").resize((hash_size, hash_size), Image.ANTIALIAS)
# find average pixel value; 'pixels' is an array of the pixel values, ranging from 0 (black) to 255 (white)
pixels = numpy.asarray(image)
avg = mean(pixels)
# create string of bits
diff = pixels > avg
# make a hash
return ImageHash(diff)
def phash(image, hash_size=8, highfreq_factor=4):
"""
Perceptual Hash computation.
Implementation follows http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
@image must be a PIL instance.
"""
if hash_size < 2:
raise ValueError("Hash size must be greater than or equal to 2")
import scipy.fftpack
img_size = hash_size * highfreq_factor
image = image.convert("L").resize((img_size, img_size), Image.ANTIALIAS)
pixels = numpy.asarray(image)
dct = scipy.fftpack.dct(scipy.fftpack.dct(pixels, axis=0), axis=1)
dctlowfreq = dct[:hash_size, :hash_size]
med = numpy.median(dctlowfreq)
diff = dctlowfreq > med
return ImageHash(diff)
def phash_simple(image, hash_size=8, highfreq_factor=4):
"""
Perceptual Hash computation.
Implementation follows http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
@image must be a PIL instance.
"""
import scipy.fftpack
img_size = hash_size * highfreq_factor
image = image.convert("L").resize((img_size, img_size), Image.ANTIALIAS)
pixels = numpy.asarray(image)
dct = scipy.fftpack.dct(pixels)
dctlowfreq = dct[:hash_size, 1:hash_size + 1]
avg = dctlowfreq.mean()
diff = dctlowfreq > avg
return ImageHash(diff)
def dhash(image, hash_size=8):
"""
Difference Hash computation.
following http://www.hackerfactor.com/blog/index.php?/archives/529-Kind-of-Like-That.html
computes differences horizontally
@image must be a PIL instance.
"""
# resize(w, h), but numpy.array((h, w))
if hash_size < 2:
raise ValueError("Hash size must be greater than or equal to 2")
image = image.convert("L").resize((hash_size + 1, hash_size), Image.ANTIALIAS)
pixels = numpy.asarray(image)
# compute differences between columns
diff = pixels[:, 1:] > pixels[:, :-1]
return ImageHash(diff)
def dhash_vertical(image, hash_size=8):
"""
Difference Hash computation.
following http://www.hackerfactor.com/blog/index.php?/archives/529-Kind-of-Like-That.html
computes differences vertically
@image must be a PIL instance.
"""
# resize(w, h), but numpy.array((h, w))
image = image.convert("L").resize((hash_size, hash_size + 1), Image.ANTIALIAS)
pixels = numpy.asarray(image)
# compute differences between rows
diff = pixels[1:, :] > pixels[:-1, :]
return ImageHash(diff)
def whash(image, hash_size=8, image_scale=None, mode='haar', remove_max_haar_ll=True):
"""
Wavelet Hash computation.
based on https://www.kaggle.com/c/avito-duplicate-ads-detection/
@image must be a PIL instance.
@hash_size must be a power of 2 and less than @image_scale.
@image_scale must be power of 2 and less than image size. By default is equal to max
power of 2 for an input image.
@mode (see modes in pywt library):
'haar' - Haar wavelets, by default
'db4' - Daubechies wavelets
@remove_max_haar_ll - remove the lowest low level (LL) frequency using Haar wavelet.
"""
import pywt
if image_scale is not None:
assert image_scale & (image_scale - 1) == 0, "image_scale is not power of 2"
else:
image_natural_scale = 2 ** int(numpy.log2(min(image.size)))
image_scale = max(image_natural_scale, hash_size)
ll_max_level = int(numpy.log2(image_scale))
level = int(numpy.log2(hash_size))
assert hash_size & (hash_size - 1) == 0, "hash_size is not power of 2"
assert level <= ll_max_level, "hash_size in a wrong range"
dwt_level = ll_max_level - level
image = image.convert("L").resize((image_scale, image_scale), Image.ANTIALIAS)
pixels = numpy.asarray(image) / 255.
# Remove low level frequency LL(max_ll) if @remove_max_haar_ll using haar filter
if remove_max_haar_ll:
coeffs = pywt.wavedec2(pixels, 'haar', level=ll_max_level)
coeffs = list(coeffs)
coeffs[0] *= 0
pixels = pywt.waverec2(coeffs, 'haar')
# Use LL(K) as freq, where K is log2(@hash_size)
coeffs = pywt.wavedec2(pixels, mode, level=dwt_level)
dwt_low = coeffs[0]
# Substract median and compute hash
med = numpy.median(dwt_low)
diff = dwt_low > med
return ImageHash(diff)
def colorhash(image, binbits=3):
"""
Color Hash computation.
Computes fractions of image in intensity, hue and saturation bins:
* the first binbits encode the black fraction of the image
* the next binbits encode the gray fraction of the remaining image (low saturation)
* the next 6*binbits encode the fraction in 6 bins of saturation, for highly saturated parts of the remaining image
* the next 6*binbits encode the fraction in 6 bins of saturation, for mildly saturated parts of the remaining image
@binbits number of bits to use to encode each pixel fractions
"""
# bin in hsv space:
intensity = numpy.asarray(image.convert("L")).flatten()
h, s, v = [numpy.asarray(v).flatten() for v in image.convert("HSV").split()]
# black bin
mask_black = intensity < 256 // 8
frac_black = mask_black.mean()
# gray bin (low saturation, but not black)
mask_gray = s < 256 // 3
frac_gray = numpy.logical_and(~mask_black, mask_gray).mean()
# two color bins (medium and high saturation, not in the two above)
mask_colors = numpy.logical_and(~mask_black, ~mask_gray)
mask_faint_colors = numpy.logical_and(mask_colors, s < 256 * 2 // 3)
mask_bright_colors = numpy.logical_and(mask_colors, s > 256 * 2 // 3)
c = max(1, mask_colors.sum())
# in the color bins, make sub-bins by hue
hue_bins = numpy.linspace(0, 255, 6 + 1)
if mask_faint_colors.any():
h_faint_counts, _ = numpy.histogram(h[mask_faint_colors], bins=hue_bins)
else:
h_faint_counts = numpy.zeros(len(hue_bins) - 1)
if mask_bright_colors.any():
h_bright_counts, _ = numpy.histogram(h[mask_bright_colors], bins=hue_bins)
else:
h_bright_counts = numpy.zeros(len(hue_bins) - 1)
# now we have fractions in each category (6*2 + 2 = 14 bins)
# convert to hash and discretize:
maxvalue = 2 ** binbits
values = [min(maxvalue - 1, int(frac_black * maxvalue)), min(maxvalue - 1, int(frac_gray * maxvalue))]
for counts in list(h_faint_counts) + list(h_bright_counts):
values.append(min(maxvalue - 1, int(counts * maxvalue * 1. / c)))
# print(values)
bitarray = []
for v in values:
bitarray += [v // (2 ** (binbits - i - 1)) % 2 ** (binbits - i) > 0 for i in range(binbits)]
return ImageHash(numpy.asarray(bitarray).reshape((-1, binbits)))
class ImageMultiHash(object):
"""
This is an image hash containing a list of individual hashes for segments of the image.
The matching logic is implemented as described in Efficient Cropping-Resistant Robust Image Hashing
"""
def __init__(self, hashes):
self.segment_hashes = hashes
def __eq__(self, other):
if other is None:
return False
return self.matches(other)
def __ne__(self, other):
return not self.matches(other)
def __sub__(self, other, hamming_cutoff=None, bit_error_rate=None):
matches, sum_distance = self.hash_diff(other, hamming_cutoff, bit_error_rate)
max_difference = len(self.segment_hashes)
if matches == 0:
return max_difference
max_distance = matches * len(self.segment_hashes[0])
tie_breaker = 0 - (float(sum_distance) / max_distance)
match_score = matches + tie_breaker
return max_difference - match_score
def __hash__(self):
return hash(tuple(hash(segment) for segment in self.segment_hashes))
def __str__(self):
return ",".join(str(x) for x in self.segment_hashes)
def __repr__(self):
return repr(self.segment_hashes)
def hash_diff(self, other_hash, hamming_cutoff=None, bit_error_rate=None):
"""
Gets the difference between two multi-hashes, as a tuple. The first element of the tuple is the number of
matching segments, and the second element is the sum of the hamming distances of matching hashes.
NOTE: Do not order directly by this tuple, as higher is better for matches, and worse for hamming cutoff.
:param other_hash: The image multi hash to compare against
:param hamming_cutoff: The maximum hamming distance to a region hash in the target hash
:param bit_error_rate: Percentage of bits which can be incorrect, an alternative to the hamming cutoff. The
default of 0.25 means that the segment hashes can be up to 25% different
"""
# Set default hamming cutoff if it's not set.
if hamming_cutoff is None and bit_error_rate is None:
bit_error_rate = 0.25
if hamming_cutoff is None:
hamming_cutoff = len(self.segment_hashes[0]) * bit_error_rate
# Get the hash distance for each region hash within cutoff
distances = []
for segment_hash in self.segment_hashes:
lowest_distance = min(
segment_hash - other_segment_hash
for other_segment_hash in other_hash.segment_hashes
)
if lowest_distance > hamming_cutoff:
continue
distances.append(lowest_distance)
return len(distances), sum(distances)
def matches(self, other_hash, region_cutoff=1, hamming_cutoff=None, bit_error_rate=None):
"""
Checks whether this hash matches another crop resistant hash, `other_hash`.
:param other_hash: The image multi hash to compare against
:param region_cutoff: The minimum number of regions which must have a matching hash
:param hamming_cutoff: The maximum hamming distance to a region hash in the target hash
:param bit_error_rate: Percentage of bits which can be incorrect, an alternative to the hamming cutoff. The
default of 0.25 means that the segment hashes can be up to 25% different
"""
matches, _ = self.hash_diff(other_hash, hamming_cutoff, bit_error_rate)
return matches >= region_cutoff
def best_match(self, other_hashes, hamming_cutoff=None, bit_error_rate=None):
"""
Returns the hash in a list which is the best match to the current hash
:param other_hashes: A list of image multi hashes to compare against
:param hamming_cutoff: The maximum hamming distance to a region hash in the target hash
:param bit_error_rate: Percentage of bits which can be incorrect, an alternative to the hamming cutoff.
Defaults to 0.25 if unset, which means the hash can be 25% different
"""
return min(
other_hashes,
key=lambda other_hash: self.__sub__(other_hash, hamming_cutoff, bit_error_rate)
)
def _find_region(remaining_pixels, segmented_pixels):
"""
Finds a region and returns a set of pixel coordinates for it.
:param remaining_pixels: A numpy bool array, with True meaning the pixels are remaining to segment
:param segmented_pixels: A set of pixel coordinates which have already been assigned to segment. This will be
updated with the new pixels added to the returned segment.
"""
in_region = set()
not_in_region = set()
# Find the first pixel in remaining_pixels with a value of True
available_pixels = numpy.transpose(numpy.nonzero(remaining_pixels))
start = tuple(available_pixels[0])
in_region.add(start)
new_pixels = in_region.copy()
while True:
try_next = set()
# Find surrounding pixels
for pixel in new_pixels:
x, y = pixel
neighbours = [
(x - 1, y),
(x + 1, y),
(x, y - 1),
(x, y + 1)
]
try_next.update(neighbours)
# Remove pixels we have already seen
try_next.difference_update(segmented_pixels, not_in_region)
# If there's no more pixels to try, the region is complete
if not try_next:
break
# Empty new pixels set, so we know whose neighbour's to check next time
new_pixels = set()
# Check new pixels
for pixel in try_next:
if remaining_pixels[pixel]:
in_region.add(pixel)
new_pixels.add(pixel)
segmented_pixels.add(pixel)
else:
not_in_region.add(pixel)
return in_region
def _find_all_segments(pixels, segment_threshold, min_segment_size):
"""
Finds all the regions within an image pixel array, and returns a list of the regions.
Note: Slightly different segmentations are produced when using pillow version 6 vs. >=7, due to a change in
rounding in the greyscale conversion.
:param pixels: A numpy array of the pixel brightnesses.
:param segment_threshold: The brightness threshold to use when differentiating between hills and valleys.
:param min_segment_size: The minimum number of pixels for a segment.
"""
img_width, img_height = pixels.shape
# threshold pixels
threshold_pixels = pixels > segment_threshold
unassigned_pixels = numpy.full(pixels.shape, True, dtype=bool)
segments = []
already_segmented = set()
# Add all the pixels around the border outside the image:
already_segmented.update([(-1, z) for z in range(img_height)])
already_segmented.update([(z, -1) for z in range(img_width)])
already_segmented.update([(img_width, z) for z in range(img_height)])
already_segmented.update([(z, img_height) for z in range(img_width)])
# Find all the "hill" regions
while numpy.bitwise_and(threshold_pixels, unassigned_pixels).any():
remaining_pixels = numpy.bitwise_and(threshold_pixels, unassigned_pixels)
segment = _find_region(remaining_pixels, already_segmented)
# Apply segment
if len(segment) > min_segment_size:
segments.append(segment)
for pix in segment:
unassigned_pixels[pix] = False
# Invert the threshold matrix, and find "valleys"
threshold_pixels_i = numpy.invert(threshold_pixels)
while len(already_segmented) < img_width * img_height:
remaining_pixels = numpy.bitwise_and(threshold_pixels_i, unassigned_pixels)
segment = _find_region(remaining_pixels, already_segmented)
# Apply segment
if len(segment) > min_segment_size:
segments.append(segment)
for pix in segment:
unassigned_pixels[pix] = False
return segments
def crop_resistant_hash(
image,
hash_func=None,
limit_segments=None,
segment_threshold=128,
min_segment_size=500,
segmentation_image_size=300
):
"""
Creates a CropResistantHash object, by the algorithm described in the paper "Efficient Cropping-Resistant Robust
Image Hashing". DOI 10.1109/ARES.2014.85
This algorithm partitions the image into bright and dark segments, using a watershed-like algorithm, and then does
an image hash on each segment. This makes the image much more resistant to cropping than other algorithms, with
the paper claiming resistance to up to 50% cropping, while most other algorithms stop at about 5% cropping.
Note: Slightly different segmentations are produced when using pillow version 6 vs. >=7, due to a change in
rounding in the greyscale conversion. This leads to a slightly different result.
:param image: The image to hash
:param hash_func: The hashing function to use
:param limit_segments: If you have storage requirements, you can limit to hashing only the M largest segments
:param segment_threshold: Brightness threshold between hills and valleys. This should be static, putting it between
peak and trough dynamically breaks the matching
:param min_segment_size: Minimum number of pixels for a hashable segment
:param segmentation_image_size: Size which the image is resized to before segmentation
"""
if hash_func is None:
hash_func = dhash
orig_image = image.copy()
# Convert to gray scale and resize
image = image.convert("L").resize((segmentation_image_size, segmentation_image_size), Image.ANTIALIAS)
# Add filters
image = image.filter(ImageFilter.GaussianBlur()).filter(ImageFilter.MedianFilter())
pixels = numpy.array(image).astype(numpy.float32)
segments = _find_all_segments(pixels, segment_threshold, min_segment_size)
# If there are no segments, have 1 segment including the whole image
if not segments:
full_image_segment = {(0, 0), (segmentation_image_size - 1, segmentation_image_size - 1)}
segments.append(full_image_segment)
# If segment limit is set, discard the smaller segments
if limit_segments:
segments = sorted(segments, key=lambda s: len(s), reverse=True)[:limit_segments]
# Create bounding box for each segment
hashes = []
for segment in segments:
orig_w, orig_h = orig_image.size
scale_w = float(orig_w) / segmentation_image_size
scale_h = float(orig_h) / segmentation_image_size
min_y = min(coord[0] for coord in segment) * scale_h
min_x = min(coord[1] for coord in segment) * scale_w
max_y = (max(coord[0] for coord in segment) + 1) * scale_h
max_x = (max(coord[1] for coord in segment) + 1) * scale_w
# Compute robust hash for each bounding box
bounding_box = orig_image.crop((min_x, min_y, max_x, max_y))
hashes.append(hash_func(bounding_box))
# Show bounding box
# im_segment = image.copy()
# for pix in segment:
# im_segment.putpixel(pix[::-1], 255)
# im_segment.show()
# bounding_box.show()
return ImageMultiHash(hashes)

View File

@ -25,7 +25,6 @@ def load_stopwords(file_path):
class IDFLoader(object):
def __init__(self, idf_path=None):
self.path = ""
self.idf_freq = {}
@ -49,7 +48,6 @@ class IDFLoader(object):
class TFIDF:
def __init__(self, idf_path=None, stopwords=None):
self.stopwords = stopwords if stopwords else load_stopwords(default_stopwords_file)
self.idf_loader = IDFLoader(idf_path or DEFAULT_IDF)

306
similarities/utils/util.py Normal file
View File

@ -0,0 +1,306 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import queue
from typing import List, Union
import numpy as np
import torch
import torch.nn.functional
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def cos_sim(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j.
:return: Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
if len(a.shape) == 1:
a = a.unsqueeze(0)
if len(b.shape) == 1:
b = b.unsqueeze(0)
a_norm = normalize_embeddings(a)
b_norm = normalize_embeddings(b)
return torch.mm(a_norm, b_norm.transpose(0, 1))
def dot_score(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the dot-product dot_prod(a[i], b[j]) for all i and j.
:return: Matrix with res[i][j] = dot_prod(a[i], b[j])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
if len(a.shape) == 1:
a = a.unsqueeze(0)
if len(b.shape) == 1:
b = b.unsqueeze(0)
return torch.mm(a, b.transpose(0, 1))
def pairwise_dot_score(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the pairwise dot-product dot_prod(a[i], b[i])
:return: Vector with res[i] = dot_prod(a[i], b[i])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
return (a * b).sum(dim=-1)
def pairwise_cos_sim(a: Union[torch.Tensor, np.ndarray], b: Union[torch.Tensor, np.ndarray]):
"""
Computes the pairwise cossim cos_sim(a[i], b[i])
:return: Vector with res[i] = cos_sim(a[i], b[i])
"""
if not isinstance(a, torch.Tensor):
a = torch.tensor(a)
if not isinstance(b, torch.Tensor):
b = torch.tensor(b)
return pairwise_dot_score(normalize_embeddings(a), normalize_embeddings(b))
def normalize_embeddings(embeddings: torch.Tensor):
"""
Normalizes the embeddings matrix, so that each sentence embedding has unit length
"""
return torch.nn.functional.normalize(embeddings, p=2, dim=1)
def semantic_search(
query_embeddings: Union[torch.Tensor, np.ndarray],
corpus_embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 100,
corpus_chunk_size: int = 500000,
top_k: int = 10,
score_function=cos_sim
):
"""
This function performs a cosine similarity search between a list of query embeddings and a list of corpus embeddings.
It can be used for Information Retrieval / Semantic Search for corpora up to about 1 Million entries.
:param query_embeddings: A 2 dimensional tensor with the query embeddings.
:param corpus_embeddings: A 2 dimensional tensor with the corpus embeddings.
:param query_chunk_size: Process 100 queries simultaneously. Increasing that value increases the speed, but
requires more memory.
:param corpus_chunk_size: Scans the corpus 100k entries at a time. Increasing that value increases the speed,
but requires more memory.
:param top_k: Retrieve top k matching entries.
:param score_function: Funtion for computing scores. By default, cosine similarity.
:return: Returns a sorted list with decreasing cosine similarity scores. Entries are dictionaries with the
keys 'corpus_id' and 'score'
"""
if isinstance(query_embeddings, (np.ndarray, np.generic)):
query_embeddings = torch.from_numpy(query_embeddings)
elif isinstance(query_embeddings, list):
query_embeddings = torch.stack(query_embeddings)
if len(query_embeddings.shape) == 1:
query_embeddings = query_embeddings.unsqueeze(0)
if isinstance(corpus_embeddings, (np.ndarray, np.generic)):
corpus_embeddings = torch.from_numpy(corpus_embeddings)
elif isinstance(corpus_embeddings, list):
corpus_embeddings = torch.stack(corpus_embeddings)
# Check that corpus and queries are on the same device
query_embeddings = query_embeddings.to(device)
corpus_embeddings = corpus_embeddings.to(device)
queries_result_list = [[] for _ in range(len(query_embeddings))]
for query_start_idx in range(0, len(query_embeddings), query_chunk_size):
# Iterate over chunks of the corpus
for corpus_start_idx in range(0, len(corpus_embeddings), corpus_chunk_size):
# Compute cosine similarity
cos_scores = score_function(query_embeddings[query_start_idx:query_start_idx + query_chunk_size],
corpus_embeddings[corpus_start_idx:corpus_start_idx + corpus_chunk_size])
# Get top-k scores
cos_scores_top_k_values, cos_scores_top_k_idx = torch.topk(cos_scores, min(top_k, len(cos_scores[0])),
dim=1, largest=True, sorted=False)
cos_scores_top_k_values = cos_scores_top_k_values.cpu().tolist()
cos_scores_top_k_idx = cos_scores_top_k_idx.cpu().tolist()
for query_itr in range(len(cos_scores)):
for sub_corpus_id, score in zip(cos_scores_top_k_idx[query_itr],
cos_scores_top_k_values[query_itr]):
corpus_id = corpus_start_idx + sub_corpus_id
query_id = query_start_idx + query_itr
queries_result_list[query_id].append({'corpus_id': corpus_id, 'score': score})
# Sort and strip to top_k results
for idx in range(len(queries_result_list)):
queries_result_list[idx] = sorted(queries_result_list[idx], key=lambda x: x['score'], reverse=True)
queries_result_list[idx] = queries_result_list[idx][0:top_k]
return queries_result_list
def paraphrase_mining_embeddings(
embeddings: Union[torch.Tensor, np.ndarray],
query_chunk_size: int = 5000,
corpus_chunk_size: int = 100000,
max_pairs: int = 500000,
top_k: int = 100,
score_function=cos_sim
):
"""
Given a list of sentences / texts, this function performs paraphrase mining. It compares all sentences against all
other sentences and returns a list with the pairs that have the highest cosine similarity score.
:param embeddings: A tensor with the embeddings
:param query_chunk_size: Search for most similar pairs for #query_chunk_size at the same time. Decrease, to lower
memory footprint (increases run-time).
:param corpus_chunk_size: Compare a sentence simultaneously against #corpus_chunk_size other sentences. Decrease,
to lower memory footprint (increases run-time).
:param max_pairs: Maximal number of text pairs returned.
:param top_k: For each sentence, we retrieve up to top_k other sentences
:param score_function: Function for computing scores. By default, cosine similarity.
:return: Returns a list of triplets with the format [score, id1, id2]
"""
if isinstance(embeddings, (np.ndarray, np.generic)):
embeddings = torch.from_numpy(embeddings)
elif isinstance(embeddings, list):
embeddings = torch.stack(embeddings)
if len(embeddings.shape) == 1:
embeddings = embeddings.unsqueeze(0)
embeddings = embeddings.to(device)
top_k += 1 # A sentence has the highest similarity to itself. Increase +1 as we are interest in distinct pairs
# Mine for duplicates
pairs = queue.PriorityQueue()
min_score = -1
num_added = 0
for corpus_start_idx in range(0, len(embeddings), corpus_chunk_size):
for query_start_idx in range(0, len(embeddings), query_chunk_size):
scores = score_function(embeddings[query_start_idx: query_start_idx + query_chunk_size],
embeddings[corpus_start_idx: corpus_start_idx + corpus_chunk_size])
scores_top_k_values, scores_top_k_idx = torch.topk(scores, min(top_k, len(scores[0])), dim=1, largest=True,
sorted=False)
scores_top_k_values = scores_top_k_values.cpu().tolist()
scores_top_k_idx = scores_top_k_idx.cpu().tolist()
for query_itr in range(len(scores)):
for top_k_idx, corpus_itr in enumerate(scores_top_k_idx[query_itr]):
i = query_start_idx + query_itr
j = corpus_start_idx + corpus_itr
if i != j and scores_top_k_values[query_itr][top_k_idx] > min_score:
pairs.put((scores_top_k_values[query_itr][top_k_idx], i, j))
num_added += 1
if num_added >= max_pairs:
entry = pairs.get()
min_score = entry[0]
# Get the pairs
added_pairs = set() # Used for duplicate detection
pairs_list = []
while not pairs.empty():
score, i, j = pairs.get()
sorted_i, sorted_j = sorted([i, j])
if sorted_i != sorted_j and (sorted_i, sorted_j) not in added_pairs:
added_pairs.add((sorted_i, sorted_j))
pairs_list.append([score, i, j])
# Highest scores first
pairs_list = sorted(pairs_list, key=lambda x: x[0], reverse=True)
return pairs_list
def community_detection(embeddings, threshold=0.75, min_community_size=10, init_max_size=1000):
"""
Function for Fast Community Detection
Finds in the embeddings all communities, i.e. embeddings that are close (closer than threshold).
Returns only communities that are larger than min_community_size. The communities are returned
in decreasing order. The first element in each list is the central point in the community.
"""
# Maximum size for community
init_max_size = min(init_max_size, len(embeddings))
# Compute cosine similarity scores
cos_scores = cos_sim(embeddings, embeddings)
# Minimum size for a community
top_k_values, _ = cos_scores.topk(k=min_community_size, largest=True)
# Filter for rows >= min_threshold
extracted_communities = []
for i in range(len(top_k_values)):
if top_k_values[i][-1] >= threshold:
new_cluster = []
# Only check top k most similar entries
top_val_large, top_idx_large = cos_scores[i].topk(k=init_max_size, largest=True)
top_idx_large = top_idx_large.tolist()
top_val_large = top_val_large.tolist()
if top_val_large[-1] < threshold:
for idx, val in zip(top_idx_large, top_val_large):
if val < threshold:
break
new_cluster.append(idx)
else:
# Iterate over all entries (slow)
for idx, val in enumerate(cos_scores[i].tolist()):
if val >= threshold:
new_cluster.append(idx)
extracted_communities.append(new_cluster)
# Largest cluster first
extracted_communities = sorted(extracted_communities, key=lambda x: len(x), reverse=True)
# Step 2) Remove overlapping communities
unique_communities = []
extracted_ids = set()
for community in extracted_communities:
add_cluster = True
for idx in community:
if idx in extracted_ids:
add_cluster = False
break
if add_cluster:
unique_communities.append(community)
for idx in community:
extracted_ids.add(idx)
return unique_communities

View File

@ -9,7 +9,6 @@ import unittest
sys.path.append('..')
from text2vec import SentenceModel
from similarities.similarity import Similarity
from similarities.fastsim import AnnoySimilarity
from similarities.fastsim import HnswlibSimilarity
@ -21,10 +20,6 @@ class FastTestCase(unittest.TestCase):
def test_sim_diff(self):
a = '研究团队面向国家重大战略需求追踪国际前沿发展借鉴国际人工智能研究领域的科研模式有效整合创新资源解决复'
b = '英汉互译比较语言学'
m = Similarity(sm)
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
m = HnswlibSimilarity(sm)
r = m.similarity(a, b)
print(a, b, r)
@ -33,20 +28,25 @@ class FastTestCase(unittest.TestCase):
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
def test_empty(self):
m = HnswlibSimilarity(sm, embedding_size=384, corpus=[])
v = m.get_vector("This is test1")
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
m = AnnoySimilarity(sm)
m.similarity("This is a test1", "that is a test5")
m.most_similar("This is a test4")
def test_hnsw_score(self):
list_of_docs = ["This is a test1", "This is a test2", "This is a test3", '刘若英是个演员', '他唱歌很好听', 'women喜欢这首歌']
list_of_docs2 = ["that is test4", "that is a test5", "that is a test6", '刘若英个演员', '唱歌很好听', 'men喜欢这首歌']
m = HnswlibSimilarity(sm, embedding_size=384, corpus=list_of_docs)
v = m.get_vector("This is test1")
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
@ -80,7 +80,7 @@ class FastTestCase(unittest.TestCase):
m = AnnoySimilarity(sm, embedding_size=384, corpus=list_of_docs * 10)
print(m)
v = m.get_vector("This is test1")
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))

83
tests/test_imagesim.py Normal file
View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""
@author:XuMing(xuming624@qq.com)
@description:
"""
import glob
import os
import sys
import unittest
sys.path.append('..')
from similarities.imagesim import ClipSimilarity, ImageHashSimilarity, SiftSimilarity
pwd_path = os.path.abspath(os.path.dirname(__file__))
image_fp1 = os.path.join(pwd_path, '../examples/data/image1.png')
image_fp2 = os.path.join(pwd_path, '../examples/data/image8-like-image1.png')
image_dir = os.path.join(pwd_path, '../examples/data/')
class ImageSimCase(unittest.TestCase):
def test_clip(self):
m = ClipSimilarity(glob.glob(f'{image_dir}/*.jpg'))
print(m)
print(m.similarity(image_fp1, image_fp2))
r = m.most_similar(image_fp1)
self.assertTrue(len(r) == 0)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(r)
self.assertTrue(len(r) > 0)
def test_sift(self):
m = SiftSimilarity(corpus=glob.glob(f'{image_dir}/*.jpg'))
print(m)
print(m.similarity(image_fp1, image_fp2))
r = m.most_similar(image_fp1)
self.assertTrue(len(r) == 0)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(r)
self.assertTrue(len(r) > 0)
def test_phash(self):
m = ImageHashSimilarity(hash_function='phash', corpus=glob.glob(f'{image_dir}/*.jpg'))
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg') + glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(r)
m = ImageHashSimilarity(hash_function='average_hash', corpus=glob.glob(f'{image_dir}/*.jpg'))
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(r)
self.assertTrue(len(r) > 0)
def test_hamming_distance(self):
m = ImageHashSimilarity(hash_function='phash', hash_size=128)
print(m.similarity(image_fp1, image_fp2))
image_fp3 = os.path.join(pwd_path, '../examples/data/image3.png')
s = m.similarity(image_fp1, image_fp3)
print(s)
self.assertTrue(s > 0)
if __name__ == '__main__':
unittest.main()

View File

@ -10,17 +10,70 @@ import unittest
sys.path.append('..')
from similarities.literalsim import SimhashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
from similarities.literalsim import SimHashSimilarity, TfidfSimilarity, BM25Similarity, WordEmbeddingSimilarity, \
CilinSimilarity, HownetSimilarity
from text2vec import Word2Vec
from similarities.utils.distance import string_hash, hamming_distance, cosine_distance
class LiteralCase(unittest.TestCase):
def test_hamming_distance(self):
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = SimHashSimilarity()
seq1 = m.simhash(text1)
seq2 = m.simhash(text2)
print(seq1)
print(seq2)
r = 1.0 - hamming_distance(seq1, seq2) / 64
print(hamming_distance(seq1, seq2))
print(r)
print(m.similarity(text1, text2))
text1 = '刘若英是个演员'
text2 = ''
m = SimHashSimilarity()
seq1 = m.simhash(text1)
seq2 = m.simhash(text2)
print(seq1)
print(seq2)
print(m.similarity(text1, text2))
text1 = '刘若'
text2 = ''
m = SimHashSimilarity()
seq1 = m.simhash(text1)
seq2 = m.simhash(text2)
print(seq1)
print(seq2)
print(m.similarity(text1, text2))
text1 = '刘若'
text2 = '他他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听,他唱歌很好听?他唱歌很好听?他唱歌很好听。。'
m = SimHashSimilarity()
seq1 = m.simhash(text1)
seq2 = m.simhash(text2)
print(seq1)
print(seq2)
print(m.similarity(text1, text2))
text1 = '刘若 他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听,他唱歌很好听?他唱歌很好听?他唱歌很好'
text2 = '他他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听他唱歌很好听,他唱歌很好听?他唱歌很好听?他唱歌很好听。。'
m = SimHashSimilarity()
seq1 = m.simhash(text1)
seq2 = m.simhash(text2)
print(seq1)
print(seq2)
s = m.similarity(text1, text2)
print(s)
self.assertTrue(s > 0)
def test_simhash(self):
"""test_simhash"""
text1 = '刘若英是个演员'
text2 = '他唱歌很好听'
m = SimhashSimilarity()
m = SimHashSimilarity()
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
print(m.most_similar('刘若英是演员'))
@ -29,7 +82,6 @@ class LiteralCase(unittest.TestCase):
m.add_corpus(zh_list)
r = m.most_similar('刘若英是演员', topn=2)
print(r)
self.assertAlmostEqual(m.similarity(text1, text2), 0.734375, places=4)
self.assertEqual(len(r), 2)
def test_tfidf(self):
@ -65,8 +117,8 @@ class LiteralCase(unittest.TestCase):
m = WordEmbeddingSimilarity(wm, list_of_corpus)
print(m.similarity(text1, text2))
print(m.distance(text1, text2))
m.add_corpus(list_of_corpus2+zh_list)
v = m.get_vector("This is a test1")
m.add_corpus(list_of_corpus2 + zh_list)
v = m._get_vector("This is a test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))

View File

@ -11,18 +11,25 @@ from text2vec import SentenceModel
from similarities.similarity import Similarity
sm = SentenceModel()
bert_model = Similarity(sm)
m = Similarity(sm)
class IssueTestCase(unittest.TestCase):
class SimScoreTestCase(unittest.TestCase):
def test_sim_diff(self):
a = '研究团队面向国家重大战略需求追踪国际前沿发展借鉴国际人工智能研究领域的科研模式有效整合创新资源解决复'
b = '英汉互译比较语言学'
r = bert_model.similarity(a, b)
r = m.similarity(a, b)
print(a, b, r)
self.assertTrue(abs(r - 0.1733) < 0.001)
def test_empty(self):
v = m._get_vector("This is test1")
print(v[:10], v.shape)
print(m.similarity("This is a test1", "that is a test5"))
print(m.distance("This is a test1", "that is a test5"))
print(m.most_similar("This is a test4"))
if __name__ == '__main__':
unittest.main()