update sim type.

This commit is contained in:
shibing624 2022-03-11 21:36:30 +08:00
parent 20e9de9a44
commit 85a043a450
8 changed files with 297 additions and 184 deletions

View File

@ -53,9 +53,8 @@ data_path = get_scifact()
#### Loading test queries and corpus in DBPedia
corpus, queries, qrels = SearchDataLoader(data_path).load(split="test")
corpus_ids, query_ids = list(corpus), list(queries)
print(len(corpus))
print(len(queries))
print(len(qrels))
logger.info(f"corpus: {len(corpus)}, queries: {len(queries)}")
#### Randomly sample 1M pairs from Original Corpus (4.63M pairs)
#### First include all relevant documents (i.e. present in qrels)
corpus_set = set()
@ -66,7 +65,7 @@ corpus_new = {corpus_id: corpus[corpus_id] for corpus_id in corpus_set}
#### Remove already seen k relevant documents and sample (1M - k) docs randomly
remaining_corpus = list(set(corpus_ids) - corpus_set)
sample = min(1000000 - len(corpus_set), len(remaining_corpus))
sample = 10
# sample = 10
for corpus_id in random.sample(remaining_corpus, sample):
corpus_new[corpus_id] = corpus[corpus_id]
@ -110,4 +109,4 @@ logger.info(f"Results size: {len(results)}")
#### Evaluate your retrieval using NDCG@k, MAP@K ...
ndcg, _map, recall, precision = evaluate(qrels, results)
print(ndcg, _map, recall, precision)
logger.info(f"MAP: {_map}")

View File

@ -54,12 +54,13 @@ data_path = get_scifact()
#### Loading test queries and corpus in DBPedia
corpus, queries, qrels = SearchDataLoader(data_path).load(split="test")
corpus_ids, query_ids = list(corpus), list(queries)
print(len(corpus))
print(len(queries))
query_keys = list(queries.keys())[:10]
queries = {key: queries[key] for key in query_keys}
print(len(queries))
print(len(qrels))
logger.info(f"corpus: {len(corpus)}, queries: {len(queries)}")
# query_keys = list(queries.keys())[:10]
# queries = {key: queries[key] for key in query_keys}
# print(len(queries))
# print(len(qrels))
#### Randomly sample 1M pairs from Original Corpus (4.63M pairs)
#### First include all relevant documents (i.e. present in qrels)
corpus_set = set()
@ -70,7 +71,6 @@ corpus_new = {corpus_id: corpus[corpus_id] for corpus_id in corpus_set}
#### Remove already seen k relevant documents and sample (1M - k) docs randomly
remaining_corpus = list(set(corpus_ids) - corpus_set)
sample = min(1000000 - len(corpus_set), len(remaining_corpus))
sample = 10
for corpus_id in random.sample(remaining_corpus, sample):
corpus_new[corpus_id] = corpus[corpus_id]
@ -78,7 +78,7 @@ for corpus_id in random.sample(remaining_corpus, sample):
corpus_docs = {corpus_id: corpus_new[corpus_id]['title'] + corpus_new[corpus_id]['text'] for corpus_id, corpus in
corpus_new.items()}
#### Index 1M passages into the index (seperately)
model = Similarity(corpus=corpus_docs)
model = Similarity(corpus=corpus_docs, model_name_or_path="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
logger.debug(model)
#### Saving benchmark times with batch
# queries = [queries[query_id] for query_id in query_ids]
@ -94,7 +94,7 @@ logger.info(f"Results size: {len(results)}")
#### Evaluate your retrieval using NDCG@k, MAP@K ...
ndcg, _map, recall, precision = evaluate(qrels, results)
print(ndcg, _map, recall, precision)
logger.info(f"MAP: {_map}")
#### Measuring Index size consumed by document embeddings
corpus_embs = model.corpus_embeddings

View File

@ -5,6 +5,7 @@
"""
import sys
import glob
from PIL import Image
sys.path.append('..')
from similarities.imagesim import ImageHashSimilarity, SiftSimilarity, ClipSimilarity
@ -13,29 +14,51 @@ from similarities.imagesim import ImageHashSimilarity, SiftSimilarity, ClipSimil
def sim_and_search(m):
print(m)
# similarity
sim_scores = m.similarity(image_fps1, image_fps2)
sim_scores = m.similarity(imgs1, imgs2)
print('sim scores: ', sim_scores)
for (idx, i), j in zip(enumerate(image_fps1), image_fps2):
s = sim_scores[idx] if isinstance(sim_scores, list) else sim_scores[idx][idx]
print(f"{i} vs {j}, score: {s:.4f}")
# search
m.add_corpus(corpus)
queries = image_fps1
m.add_corpus(corpus_imgs)
queries = imgs1
res = m.most_similar(queries, topn=3)
print('sim search: ', res)
for q_id, c in res.items():
print('query:', queries[q_id])
print('query:', image_fps1[q_id])
print("search top 3:")
for corpus_id, s in c.items():
print(f'\t{m.corpus[corpus_id]}: {s:.4f}')
print(f'\t{m.corpus[corpus_id].filename}: {s:.4f}')
print('-' * 50 + '\n')
def clip_demo():
m = ClipSimilarity()
print(m)
# similarity score between text and image
image_fps = ['data/image3.png', # yellow flower image
'data/image1.png'] # tiger image
texts = ['a yellow flower', 'a tiger']
imgs = [Image.open(i) for i in image_fps]
sim_scores = m.similarity(imgs, texts)
print('sim scores: ', sim_scores)
for (idx, i), j in zip(enumerate(image_fps), texts):
s = sim_scores[idx][idx]
print(f"{i} vs {j}, score: {s:.4f}")
print('-' * 50 + '\n')
if __name__ == "__main__":
image_fps1 = ['data/image1.png', 'data/image3.png']
image_fps2 = ['data/image12-like-image1.png', 'data/image10.png']
corpus = glob.glob('data/*.jpg') + glob.glob('data/*.png')
imgs1 = [Image.open(i) for i in image_fps1]
imgs2 = [Image.open(i) for i in image_fps2]
corpus_fps = glob.glob('data/*.jpg') + glob.glob('data/*.png')
corpus_imgs = [Image.open(i) for i in corpus_fps]
# 1. image and text similarity
clip_demo()
# 2. image and image similarity score
sim_and_search(ClipSimilarity()) # the best result
sim_and_search(ImageHashSimilarity(hash_function='phash'))
sim_and_search(SiftSimilarity())

View File

@ -91,21 +91,22 @@ class CLIPModel(nn.Module):
@staticmethod
def load(input_path: str):
return CLIPModel(model_name=input_path)
def _text_length(self, text: Union[List[int], List[List[int]]]):
def _text_length(self, text):
"""
Help function to get the length for the input text. Text can be either
a list of ints (which means a single text as input), or a tuple of list of ints
(representing several text inputs to the model).
"""
if isinstance(text, dict): #{key: value} case
if isinstance(text, dict): # {key: value} case
return len(next(iter(text.values())))
elif not hasattr(text, '__len__'): #Object has no len() method
elif not hasattr(text, '__len__'): # Object has no len() method
return 1
elif len(text) == 0 or isinstance(text[0], int): #Empty string or list of ints
elif len(text) == 0 or isinstance(text[0], int): # Empty string or list of ints
return len(text)
else:
return sum([len(t) for t in text]) #Sum of length of individual strings
return sum([len(t) for t in text]) # Sum of length of individual strings
@staticmethod
def batch_to_device(batch):
@ -117,7 +118,6 @@ class CLIPModel(nn.Module):
batch[key] = batch[key].to(device)
return batch
def encode(
self,
sentences: Union[str, List[str]],
@ -127,7 +127,7 @@ class CLIPModel(nn.Module):
normalize_embeddings: bool = False
):
"""
Computes sentence embeddings
Computes sentence and images embeddings
:param sentences: the sentences to embed
:param batch_size: the batch size used for the computation

View File

@ -19,9 +19,10 @@ from similarities.similarity import SimilarityABC, Similarity
from similarities.utils.distance import hamming_distance
from similarities.utils.imagehash import phash, dhash, whash, average_hash
from similarities.clip_model import CLIPModel
from similarities.utils.util import cos_sim, semantic_search, dot_score
class ClipSimilarity(Similarity):
class ClipSimilarity(SimilarityABC):
"""
Compute CLIP similarity between two images and retrieves most
similar image for a given image corpus.
@ -31,11 +32,20 @@ class ClipSimilarity(Similarity):
def __init__(
self,
corpus: Union[List[str], Dict[str, str]] = None,
corpus: Union[List[Image.Image], Dict[str, Image.Image]] = None,
model_name_or_path='openai/clip-vit-base-patch32'
):
self.clip_model = CLIPModel(model_name_or_path) # load the CLIP model
super().__init__(corpus, self.clip_model)
self.score_functions = {'cos_sim': cos_sim, 'dot': dot_score}
self.corpus = {}
self.corpus_ids_map = {}
self.corpus_embeddings = []
if corpus is not None:
self.add_corpus(corpus)
def __len__(self):
"""Get length of corpus."""
return len(self.corpus)
def __str__(self):
base = f"Similarity: {self.__class__.__name__}, matching_model: {self.clip_model.__class__.__name__}"
@ -49,17 +59,97 @@ class ClipSimilarity(Similarity):
img = img.convert('RGB')
return img
def _get_vector(self, img_paths: Union[str, List[str]], show_progress_bar: bool = False):
def _get_vector(self, text_or_img: Union[List[Image.Image], Image.Image, str, List[str]],
show_progress_bar: bool = False):
"""
Returns the embeddings for a batch of images.
:param img_paths:
:return:
:param text_or_img: list of str or str or Image.Image or image list
:return: np.ndarray, embeddings for the given images
"""
if isinstance(img_paths, str):
img_paths = [img_paths]
imgs = [Image.open(filepath) for filepath in img_paths]
imgs = [self._convert_to_rgb(img) for img in imgs]
return self.clip_model.encode(imgs, batch_size=128, show_progress_bar=show_progress_bar)
if isinstance(text_or_img, str):
text_or_img = [text_or_img]
if isinstance(text_or_img, Image.Image):
text_or_img = [text_or_img]
if isinstance(text_or_img, list) and isinstance(text_or_img[0], Image.Image):
text_or_img = [self._convert_to_rgb(i) for i in text_or_img]
return self.clip_model.encode(text_or_img, batch_size=128, show_progress_bar=show_progress_bar)
def add_corpus(self, corpus: Union[List[Image.Image], Dict[str, Image.Image]]):
"""
Extend the corpus with new documents.
Parameters
----------
corpus : list of str or dict
"""
corpus_new = {}
start_id = len(self.corpus) if self.corpus else 0
if isinstance(corpus, list):
for id, doc in enumerate(corpus):
if doc not in list(self.corpus.values()):
corpus_new[start_id + id] = doc
else:
for id, doc in corpus.items():
if doc not in list(self.corpus.values()):
corpus_new[id] = doc
self.corpus.update(corpus_new)
self.corpus_ids_map = {i: id for i, id in enumerate(list(self.corpus.keys()))}
logger.info(f"Start computing corpus embeddings, new docs: {len(corpus_new)}")
corpus_embeddings = self._get_vector(list(corpus_new.values()), show_progress_bar=True).tolist()
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
else:
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add {len(corpus)} docs, total: {len(self.corpus)}, emb size: {len(self.corpus_embeddings)}")
def similarity(
self,
a: Union[List[Image.Image], Image.Image, str, List[str]],
b: Union[List[Image.Image], Image.Image, str, List[str]],
score_function: str = "cos_sim"
):
"""
Compute similarity between two texts.
:param a: list of str or str
:param b: list of str or str
:param score_function: function to compute similarity, default cos_sim
:return: similarity score, torch.Tensor, Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
if score_function not in self.score_functions:
raise ValueError(f"score function: {score_function} must be either (cos_sim) for cosine similarity"
" or (dot) for dot product")
score_function = self.score_functions[score_function]
text_emb1 = self._get_vector(a)
text_emb2 = self._get_vector(b)
return score_function(text_emb1, text_emb2)
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(a, b)
def most_similar(self, queries, topn: int = 10):
"""
Find the topn most similar texts to the queries against the corpus.
:param queries: text or image
:param topn: int
:return: Dict[str, Dict[str, float]], {query_id: {corpus_id: similarity_score}, ...}
"""
if isinstance(queries, str) or not hasattr(queries, '__len__'):
queries = [queries]
if isinstance(queries, list):
queries = {id: query for id, query in enumerate(queries)}
result = {qid: {} for qid, query in queries.items()}
queries_ids_map = {i: id for i, id in enumerate(list(queries.keys()))}
queries_texts = list(queries.values())
queries_embeddings = self._get_vector(queries_texts)
corpus_embeddings = np.array(self.corpus_embeddings, dtype=np.float32)
all_hits = semantic_search(queries_embeddings, corpus_embeddings, top_k=topn)
for idx, hits in enumerate(all_hits):
for hit in hits[0:topn]:
result[queries_ids_map[idx]][self.corpus_ids_map[hit['corpus_id']]] = hit['score']
return result
class ImageHashSimilarity(SimilarityABC):
@ -70,7 +160,7 @@ class ImageHashSimilarity(SimilarityABC):
perceptual hash (pHash), which acts as an image fingerprint.
"""
def __init__(self, corpus: Union[List[str], Dict[str, str]] = None,
def __init__(self, corpus: Union[List[Image.Image], Dict[str, Image.Image]] = None,
hash_function: str = "phash", hash_size: int = 16):
self.hash_functions = {'phash': phash, 'dhash': dhash, 'whash': whash, 'average_hash': average_hash}
if hash_function not in self.hash_functions:
@ -92,7 +182,7 @@ class ImageHashSimilarity(SimilarityABC):
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: Union[List[str], Dict[str, str]]):
def add_corpus(self, corpus: Union[List[Image.Image], Dict[str, Image.Image]]):
"""
Extend the corpus with new documents.
@ -103,7 +193,6 @@ class ImageHashSimilarity(SimilarityABC):
corpus_new = {}
start_id = len(self.corpus) if self.corpus else 0
if isinstance(corpus, list):
corpus = list(set(corpus))
for id, doc in enumerate(corpus):
if doc not in list(self.corpus.values()):
corpus_new[start_id + id] = doc
@ -116,7 +205,7 @@ class ImageHashSimilarity(SimilarityABC):
logger.info(f"Start computing corpus embeddings, new docs: {len(corpus_new)}")
corpus_embeddings = []
for doc_fp in tqdm(list(corpus_new.values()), desc="Calculating corpus image hash"):
doc_seq = str(self.hash_function(Image.open(doc_fp), self.hash_size))
doc_seq = str(self.hash_function(doc_fp, self.hash_size))
corpus_embeddings.append(doc_seq)
if self.corpus_embeddings:
self.corpus_embeddings += corpus_embeddings
@ -126,33 +215,33 @@ class ImageHashSimilarity(SimilarityABC):
def _sim_score(self, seq1, seq2):
"""Compute hamming similarity between two seqs."""
return 1.0 - hamming_distance(seq1, seq2) / len(seq1)
return 1 - hamming_distance(seq1, seq2) / len(seq1)
def similarity(self, img_paths1: Union[str, List[str]], img_paths2: Union[str, List[str]]):
def similarity(self, a: Union[List[Image.Image], Image.Image], b: Union[List[Image.Image], Image.Image]):
"""
Compute similarity between two image files.
:param img_paths1: image file paths 1
:param img_paths2: image file paths 2
:param a: images 1
:param b: images 2
:return: list of float, similarity score
"""
if isinstance(img_paths1, str):
img_paths1 = [img_paths1]
if isinstance(img_paths2, str):
img_paths2 = [img_paths2]
if len(img_paths1) != len(img_paths2):
if isinstance(a, Image.Image):
a = [a]
if isinstance(b, Image.Image):
b = [b]
if len(a) != len(b):
raise ValueError("expected two inputs of the same length")
seqs1 = [str(self.hash_function(Image.open(i), self.hash_size)) for i in img_paths1]
seqs2 = [str(self.hash_function(Image.open(i), self.hash_size)) for i in img_paths2]
seqs1 = [str(self.hash_function(i, self.hash_size)) for i in a]
seqs2 = [str(self.hash_function(i, self.hash_size)) for i in b]
scores = [self._sim_score(seq1, seq2) for seq1, seq2 in zip(seqs1, seqs2)]
return scores
def distance(self, img_paths1: Union[str, List[str]], img_paths2: Union[str, List[str]]):
def distance(self, a: Union[List[Image.Image], Image.Image], b: Union[List[Image.Image], Image.Image]):
"""Compute distance between two image files."""
sim_scores = self.similarity(img_paths1, img_paths2)
return [1.0 - score for score in sim_scores]
sim_scores = self.similarity(a, b)
return [1 - score for score in sim_scores]
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
def most_similar(self, queries: Union[Image.Image, List[Image.Image], Dict[str, Image.Image]], topn: int = 10):
"""
Find the topn most similar images to the query against the corpus.
:param queries: str of list of str, image file paths
@ -167,7 +256,7 @@ class ImageHashSimilarity(SimilarityABC):
for qid, query in queries.items():
q_res = []
q_seq = str(self.hash_function(Image.open(query), self.hash_size))
q_seq = str(self.hash_function(query, self.hash_size))
for (corpus_id, doc), doc_seq in zip(self.corpus.items(), self.corpus_embeddings):
score = self._sim_score(q_seq, doc_seq)
q_res.append((corpus_id, score))
@ -187,7 +276,7 @@ class SiftSimilarity(SimilarityABC):
https://blog.csdn.net/zddblog/article/details/7521424
"""
def __init__(self, corpus: Union[List[str], Dict[str, str]] = None, nfeatures: int = 500):
def __init__(self, corpus: Union[List[Image.Image], Dict[str, Image.Image]] = None, nfeatures: int = 500):
try:
import cv2
except ImportError:
@ -209,7 +298,7 @@ class SiftSimilarity(SimilarityABC):
base += f", corpus size: {len(self.corpus)}"
return base
def add_corpus(self, corpus: Union[List[str], Dict[str, str]]):
def add_corpus(self, corpus: Union[List[Image.Image], Dict[str, Image.Image]]):
"""
Extend the corpus with new documents.
@ -220,7 +309,6 @@ class SiftSimilarity(SimilarityABC):
corpus_new = {}
start_id = len(self.corpus) if self.corpus else 0
if isinstance(corpus, list):
corpus = list(set(corpus))
for id, doc in enumerate(corpus):
if doc not in list(self.corpus.values()):
corpus_new[start_id + id] = doc
@ -232,8 +320,7 @@ class SiftSimilarity(SimilarityABC):
self.corpus_ids_map = {i: id for i, id in enumerate(list(self.corpus.keys()))}
logger.info(f"Start computing corpus embeddings, new docs: {len(corpus_new)}")
corpus_embeddings = []
for doc_fp in tqdm(list(corpus_new.values()), desc="Calculating corpus image SIFT"):
img = Image.open(doc_fp)
for img in tqdm(list(corpus_new.values()), desc="Calculating corpus image SIFT"):
_, descriptors = self.calculate_descr(img)
if len(descriptors.shape) > 0 and descriptors.shape[0] > 0:
corpus_embeddings.append(descriptors.tolist())
@ -290,40 +377,40 @@ class SiftSimilarity(SimilarityABC):
score = (topBestNSum / bestN) * good_matches_sum / len(good_matches)
return score
def similarity(self, img_paths1: Union[str, List[str]], img_paths2: Union[str, List[str]]):
def similarity(self, a: Union[List[Image.Image], Image.Image], b: Union[List[Image.Image], Image.Image]):
"""
Compute similarity between two image files.
:param img_paths1: image file paths 1
:param img_paths2: image file paths 2
:param a: images 1
:param b: images 2
:return: list of float, similarity score
"""
if isinstance(img_paths1, str):
img_paths1 = [img_paths1]
if isinstance(img_paths2, str):
img_paths2 = [img_paths2]
if len(img_paths1) != len(img_paths2):
if isinstance(a, Image.Image):
a = [a]
if isinstance(b, Image.Image):
b = [b]
if len(a) != len(b):
raise ValueError("expected two inputs of the same length")
scores = []
for fp1, fp2 in zip(img_paths1, img_paths2):
for img1, img2 in zip(a, b):
score = 0.0
_, desc1 = self.calculate_descr(Image.open(fp1))
_, desc2 = self.calculate_descr(Image.open(fp2))
_, desc1 = self.calculate_descr(img1)
_, desc2 = self.calculate_descr(img2)
if desc1.size > 0 and desc2.size > 0:
score = self._sim_score(desc1, desc2)
scores.append(score)
return scores
def distance(self, img_paths1: Union[str, List[str]], img_paths2: Union[str, List[str]]):
def distance(self, a: Union[List[Image.Image], Image.Image], b: Union[List[Image.Image], Image.Image]):
"""Compute distance between two keys."""
sim_scores = self.similarity(img_paths1, img_paths2)
return [1.0 - score for score in sim_scores]
sim_scores = self.similarity(a, b)
return [1 - score for score in sim_scores]
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
def most_similar(self, queries: Union[Image.Image, List[Image.Image], Dict[str, Image.Image]], topn: int = 10):
"""
Find the topn most similar images to the query against the corpus.
:param queries: str of list of str, image file paths
:param queries: PIL images
:param topn: int
:return: list of list tuples (id, image_path, similarity)
"""
@ -335,7 +422,7 @@ class SiftSimilarity(SimilarityABC):
for qid, query in queries.items():
q_res = []
_, q_desc = self.calculate_descr(Image.open(query))
_, q_desc = self.calculate_descr(query)
for (corpus_id, doc), doc_desc in zip(enumerate(self.corpus), self.corpus_embeddings):
score = self._sim_score(q_desc, doc_desc)
q_res.append((corpus_id, score))

View File

@ -83,13 +83,13 @@ class SimHashSimilarity(SimilarityABC):
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add {len(corpus)} docs, total: {len(self.corpus)}, emb size: {len(self.corpus_embeddings)}")
def simhash(self, text: str):
def simhash(self, sentence: str):
"""
Compute SimHash for a given text.
:param text: str
:param sentence: str
:return: hash code
"""
seg = jieba.cut(text)
seg = jieba.cut(sentence)
key_word = jieba.analyse.extract_tags('|'.join(seg), topK=None, withWeight=True, allowPOS=())
# 先按照权重排序,再按照词排序
key_list = []
@ -119,48 +119,48 @@ class SimHashSimilarity(SimilarityABC):
# 将距离转化为相似度
score = 0.0
if len(seq1) > 2 and len(seq2) > 2:
score = 1.0 - hamming_distance(seq1, seq2) / len(seq1)
score = 1 - hamming_distance(seq1, seq2) / len(seq1)
return score
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Compute hamming similarity between two sentences.
Parameters
----------
text1 : str or list of str
text2 : str or list of str
a : str or list of str
b : str or list of str
Returns
-------
list of float
"""
if isinstance(text1, str):
text1 = [text1]
if isinstance(text2, str):
text2 = [text2]
if len(text1) != len(text2):
if isinstance(a, str):
a = [a]
if isinstance(b, str):
b = [b]
if len(a) != len(b):
raise ValueError("expected two inputs of the same length")
seqs1 = [self.simhash(text) for text in text1]
seqs2 = [self.simhash(text) for text in text2]
seqs1 = [self.simhash(text) for text in a]
seqs2 = [self.simhash(text) for text in b]
scores = [self._sim_score(seq1, seq2) for seq1, seq2 in zip(seqs1, seqs2)]
return scores
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Compute hamming distance between two sentences.
Parameters
----------
text1 : str or list of str
text2 : str or list of str
a : str or list of str
b : str or list of str
Returns
-------
list of float
"""
sim_scores = self.similarity(text1, text2)
return [1.0 - score for score in sim_scores]
sim_scores = self.similarity(a, b)
return [1 - score for score in sim_scores]
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""
@ -246,24 +246,24 @@ class TfidfSimilarity(SimilarityABC):
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add {len(corpus)} docs, total: {len(self.corpus)}, emb size: {len(self.corpus_embeddings)}")
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Compute cosine similarity score between two sentences.
:param text1:
:param text2:
:param a:
:param b:
:return:
"""
if isinstance(text1, str):
text1 = [text1]
if isinstance(text2, str):
text2 = [text2]
features1 = [self.tfidf.get_tfidf(text) for text in text1]
features2 = [self.tfidf.get_tfidf(text) for text in text2]
if isinstance(a, str):
a = [a]
if isinstance(b, str):
b = [b]
features1 = [self.tfidf.get_tfidf(text) for text in a]
features2 = [self.tfidf.get_tfidf(text) for text in b]
return cos_sim(np.array(features1), np.array(features2))
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two keys."""
return 1.0 - self.similarity(text1, text2)
return 1 - self.similarity(a, b)
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
@ -432,15 +432,15 @@ class WordEmbeddingSimilarity(SimilarityABC):
def _get_vector(self, text, show_progress_bar: bool = False) -> np.ndarray:
return self.keyedvectors.encode(text, show_progress_bar=show_progress_bar)
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine similarity between two texts."""
v1 = self._get_vector(text1)
v2 = self._get_vector(text2)
v1 = self._get_vector(a)
v2 = self._get_vector(b)
return cos_sim(v1, v2)
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(text1, text2)
return 1 - self.similarity(a, b)
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""
@ -570,18 +570,18 @@ class CilinSimilarity(SimilarityABC):
score += 1
return score / 10
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Compute Cilin similarity between two texts.
:param text1:
:param text2:
:param a:
:param b:
:return:
"""
if isinstance(text1, str):
text1 = [text1]
if isinstance(text2, str):
text2 = [text2]
if len(text1) != len(text2):
if isinstance(a, str):
a = [a]
if isinstance(b, str):
b = [b]
if len(a) != len(b):
raise ValueError("expected two inputs of the same length")
def calc_pair_sim(sentence1, sentence2):
@ -598,11 +598,11 @@ class CilinSimilarity(SimilarityABC):
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
return [calc_pair_sim(sentence1, sentence2) for sentence1, sentence2 in zip(text1, text2)]
return [calc_pair_sim(sentence1, sentence2) for sentence1, sentence2 in zip(a, b)]
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
return [1.0 - s for s in self.similarity(text1, text2)]
return [1 - s for s in self.similarity(a, b)]
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""
@ -700,18 +700,18 @@ class HownetSimilarity(SimilarityABC):
else:
return 0
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Computer Hownet similarity between two texts.
:param text1:
:param text2:
:param a:
:param b:
:return:
"""
if isinstance(text1, str):
text1 = [text1]
if isinstance(text2, str):
text2 = [text2]
if len(text1) != len(text2):
if isinstance(a, str):
a = [a]
if isinstance(b, str):
b = [b]
if len(a) != len(b):
raise ValueError("expected two inputs of the same length")
def calc_pair_sim(sentence1, sentence2):
@ -728,11 +728,11 @@ class HownetSimilarity(SimilarityABC):
similarity_score = max(sum(score_words1) / len(words1), sum(score_words2) / len(words2))
return similarity_score
return [calc_pair_sim(sentence1, sentence2) for sentence1, sentence2 in zip(text1, text2)]
return [calc_pair_sim(sentence1, sentence2) for sentence1, sentence2 in zip(a, b)]
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute Hownet distance between two keys."""
return [1.0 - s for s in self.similarity(text1, text2)]
return [1 - s for s in self.similarity(a, b)]
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""Find the topn most similar texts to the query against the corpus."""

View File

@ -40,17 +40,17 @@ class SimilarityABC:
"""
raise NotImplementedError("cannot instantiate Abstract Base Class")
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""
Compute similarity between two texts.
:param text1: list of str or str
:param text2: list of str or str
:param a: list of str or str
:param b: list of str or str
:param score_function: function to compute similarity, default cos_sim
:return: similarity score, torch.Tensor, Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
raise NotImplementedError("cannot instantiate Abstract Base Class")
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
raise NotImplementedError("cannot instantiate Abstract Base Class")
@ -136,19 +136,19 @@ class Similarity(SimilarityABC):
self.corpus_embeddings = corpus_embeddings
logger.info(f"Add {len(corpus)} docs, total: {len(self.corpus)}, emb size: {len(self.corpus_embeddings)}")
def _get_vector(self, text: Union[str, List[str]], show_progress_bar: bool = False) -> np.ndarray:
def _get_vector(self, sentences: Union[str, List[str]], show_progress_bar: bool = False) -> np.ndarray:
"""
Returns the embeddings for a batch of sentences.
:param text:
:param sentences:
:return:
"""
return self.sentence_model.encode(text, show_progress_bar=show_progress_bar)
return self.sentence_model.encode(sentences, show_progress_bar=show_progress_bar)
def similarity(self, text1: Union[str, List[str]], text2: Union[str, List[str]], score_function: str = "cos_sim"):
def similarity(self, a: Union[str, List[str]], b: Union[str, List[str]], score_function: str = "cos_sim"):
"""
Compute similarity between two texts.
:param text1: list of str or str
:param text2: list of str or str
:param a: list of str or str
:param b: list of str or str
:param score_function: function to compute similarity, default cos_sim
:return: similarity score, torch.Tensor, Matrix with res[i][j] = cos_sim(a[i], b[j])
"""
@ -156,14 +156,14 @@ class Similarity(SimilarityABC):
raise ValueError(f"score function: {score_function} must be either (cos_sim) for cosine similarity"
" or (dot) for dot product")
score_function = self.score_functions[score_function]
text_emb1 = self._get_vector(text1)
text_emb2 = self._get_vector(text2)
text_emb1 = self._get_vector(a)
text_emb2 = self._get_vector(b)
return score_function(text_emb1, text_emb2)
def distance(self, text1: Union[str, List[str]], text2: Union[str, List[str]]):
def distance(self, a: Union[str, List[str]], b: Union[str, List[str]]):
"""Compute cosine distance between two texts."""
return 1 - self.similarity(text1, text2)
return 1 - self.similarity(a, b)
def most_similar(self, queries: Union[str, List[str], Dict[str, str]], topn: int = 10):
"""

View File

@ -7,6 +7,7 @@ import glob
import os
import sys
import unittest
from PIL import Image
sys.path.append('..')
@ -14,71 +15,74 @@ from similarities.imagesim import ClipSimilarity, ImageHashSimilarity, SiftSimil
pwd_path = os.path.abspath(os.path.dirname(__file__))
image_fp1 = os.path.join(pwd_path, '../examples/data/image1.png')
image_fp2 = os.path.join(pwd_path, '../examples/data/image8-like-image1.png')
img1 = Image.open(os.path.join(pwd_path, '../examples/data/image1.png'))
img2 = Image.open(os.path.join(pwd_path, '../examples/data/image8-like-image1.png'))
image_dir = os.path.join(pwd_path, '../examples/data/')
corpus_imgs = [Image.open(i) for i in glob.glob(os.path.join(image_dir, '*.png'))]
class ImageSimCase(unittest.TestCase):
def test_clip(self):
m = ClipSimilarity(glob.glob(f'{image_dir}/*.jpg'))
m = ClipSimilarity()
print(m)
s = m.similarity(image_fp1, image_fp2)
s = m.similarity(img1, img2)
print(s)
self.assertTrue(s > 0.5)
r = m.most_similar(image_fp1)
r = m.most_similar(img1)
print(r)
self.assertTrue(not r[0])
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
m.add_corpus(corpus_imgs)
r = m.most_similar(image_fp1)
r = m.most_similar(img1)
print(r)
self.assertTrue(len(r) > 0)
def test_clip_dict(self):
m = ClipSimilarity()
print(m)
corpus_dict = {i.filename: i for i in corpus_imgs}
queries = {i.filename: i for i in corpus_imgs[:3]}
m.add_corpus(corpus_dict)
r = m.most_similar(queries)
print(r)
self.assertTrue(len(r) > 0)
def test_sift(self):
m = SiftSimilarity(corpus=glob.glob(f'{image_dir}/*.jpg'))
print(m)
print(m.similarity(image_fp1, image_fp2))
r = m.most_similar(image_fp1)
print(m.similarity(img1, img2))
r = m.most_similar(img1)
print(r)
self.assertTrue(not r[0])
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
m.add_corpus(corpus_imgs)
m.add_corpus(corpus_imgs)
r = m.most_similar(img1)
print(r)
self.assertTrue(len(r) > 0)
def test_phash(self):
m = ImageHashSimilarity(hash_function='phash', corpus=glob.glob(f'{image_dir}/*.jpg'))
m = ImageHashSimilarity(hash_function='phash')
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.jpg') + glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(m.similarity(img1, img2))
m.most_similar(img1)
m.add_corpus(corpus_imgs)
r = m.most_similar(img1)
print(r)
m = ImageHashSimilarity(hash_function='average_hash', corpus=glob.glob(f'{image_dir}/*.jpg'))
m = ImageHashSimilarity(hash_function='average_hash')
print(m)
print(m.similarity(image_fp1, image_fp2))
m.most_similar(image_fp1)
# no corpus
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
m.add_corpus(glob.glob(f'{image_dir}/*.png'))
r = m.most_similar(image_fp1)
print(m.similarity(img1, img2))
m.most_similar(img1)
m.add_corpus(corpus_imgs)
m.add_corpus(corpus_imgs)
r = m.most_similar(img1)
print(r)
self.assertTrue(len(r) > 0)
def test_hamming_distance(self):
m = ImageHashSimilarity(hash_function='phash', hash_size=128)
print(m.similarity(image_fp1, image_fp2))
image_fp3 = os.path.join(pwd_path, '../examples/data/image3.png')
s = m.similarity(image_fp1, image_fp3)
s = m.similarity(img1, img2)
print(s)
self.assertTrue(s[0] > 0)