299 lines
8.4 KiB
Python
Executable File
299 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#=========================================================================
|
|
#
|
|
# Copyright (c) 2017 <> All Rights Reserved
|
|
#
|
|
#
|
|
# File: /Users/hain/ai/Synonyms/synonyms/__init__.py
|
|
# Author: Hai Liang Wang
|
|
# Date: 2017-09-27
|
|
#
|
|
#=========================================================================
|
|
|
|
"""
|
|
Chinese Synonyms for Natural Language Processing and Understanding.
|
|
"""
|
|
from __future__ import print_function
|
|
from __future__ import division
|
|
|
|
__copyright__ = "Copyright (c) 2017 . All Rights Reserved"
|
|
__author__ = "Hu Ying Xi<>, Hai Liang Wang<hailiang.hl.wang@gmail.com>"
|
|
__date__ = "2017-09-27"
|
|
|
|
|
|
import os
|
|
import sys
|
|
import numpy as np
|
|
curdir = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.append(curdir)
|
|
|
|
PLT = 2
|
|
|
|
if sys.version_info[0] < 3:
|
|
default_stdout = sys.stdout
|
|
default_stderr = sys.stderr
|
|
reload(sys)
|
|
sys.stdout = default_stdout
|
|
sys.stderr = default_stderr
|
|
sys.setdefaultencoding("utf-8")
|
|
# raise "Must be using Python 3"
|
|
else:
|
|
PLT = 3
|
|
|
|
# Get Environment variables
|
|
ENVIRON = os.environ.copy()
|
|
|
|
import json
|
|
import gzip
|
|
import shutil
|
|
from synonyms.word2vec import KeyedVectors
|
|
from synonyms.utils import any2utf8
|
|
from synonyms.utils import any2unicode
|
|
from synonyms.utils import sigmoid
|
|
from synonyms.utils import cosine
|
|
import jieba
|
|
import jieba.posseg as _tokenizer
|
|
|
|
'''
|
|
globals
|
|
'''
|
|
_vocab = dict()
|
|
_size = 0
|
|
_vectors = None
|
|
_stopwords = set()
|
|
_cache_nearby = dict()
|
|
|
|
'''
|
|
lambda fns
|
|
'''
|
|
# combine similarity scores
|
|
_similarity_smooth = lambda x, y, z: (x * y) + z
|
|
_flat_sum_array = lambda x: np.sum(x, axis=0) # 分子
|
|
|
|
'''
|
|
tokenizer settings
|
|
'''
|
|
tokenizer_dict = os.path.join(curdir, 'data', 'vocab.txt')
|
|
if "SYNONYMS_WORDSEG_DICT" in ENVIRON:
|
|
if os.exist(ENVIRON["SYNONYMS_WORDSEG_DICT"]):
|
|
print("info: set wordseg dict with %s" % tokenizer_dict)
|
|
tokenizer_dict = ENVIRON["SYNONYMS_WORDSEG_DICT"]
|
|
else: print("warning: can not find dict at [%s]" % tokenizer_dict)
|
|
|
|
print(">> Synonyms load wordseg dict [%s] ... " % tokenizer_dict)
|
|
jieba.set_dictionary(tokenizer_dict)
|
|
|
|
# stopwords
|
|
_fin_stopwords_path = os.path.join(curdir, 'data', 'stopwords.txt')
|
|
def _load_stopwords(file_path):
|
|
'''
|
|
load stop words
|
|
'''
|
|
global _stopwords
|
|
if sys.version_info[0] < 3:
|
|
words = open(file_path, 'r')
|
|
else:
|
|
words = open(file_path, 'r', encoding='utf-8')
|
|
stopwords = words.readlines()
|
|
for w in stopwords:
|
|
_stopwords.add(any2unicode(w).strip())
|
|
|
|
print(">> Synonyms on loading stopwords [%s] ..." % _fin_stopwords_path)
|
|
_load_stopwords(_fin_stopwords_path)
|
|
|
|
def _segment_words(sen):
|
|
'''
|
|
segment words with jieba
|
|
'''
|
|
words, tags = [], []
|
|
m = _tokenizer.cut(sen, HMM=True) # HMM更好的识别新词
|
|
for x in m:
|
|
words.append(x.word)
|
|
tags.append(x.flag)
|
|
return words, tags
|
|
|
|
'''
|
|
word embedding
|
|
'''
|
|
# vectors
|
|
_f_model = os.path.join(curdir, 'data', 'words.vector')
|
|
if "SYNONYMS_WORD2VEC_BIN_MODEL_ZH_CN" in ENVIRON:
|
|
_f_model = ENVIRON["SYNONYMS_WORD2VEC_BIN_MODEL_ZH_CN"]
|
|
def _load_w2v(model_file=_f_model, binary=True):
|
|
'''
|
|
load word2vec model
|
|
'''
|
|
if not os.path.exists(model_file):
|
|
print("os.path : ", os.path)
|
|
raise Exception("Model file [%s] does not exist." % model_file)
|
|
return KeyedVectors.load_word2vec_format(
|
|
model_file, binary=binary, unicode_errors='ignore')
|
|
print(">> Synonyms on loading vectors [%s] ..." % _f_model)
|
|
_vectors = _load_w2v(model_file=_f_model)
|
|
|
|
def _get_wv(sentence):
|
|
'''
|
|
get word2vec data by sentence
|
|
sentence is segmented string.
|
|
'''
|
|
global _vectors
|
|
vectors = []
|
|
for y in sentence:
|
|
y_ = any2unicode(y).strip()
|
|
if y_ not in _stopwords:
|
|
syns = nearby(y_)[0]
|
|
# print("sentence %s word: %s" %(sentence, y_))
|
|
# print("sentence %s word nearby: %s" %(sentence, " ".join(syns)))
|
|
c = []
|
|
try:
|
|
c.append(_vectors.word_vec(y_))
|
|
except KeyError as error:
|
|
print("not exist in w2v model: %s" % y_)
|
|
# c.append(np.zeros((100,), dtype=float))
|
|
random_state = np.random.RandomState(seed=(hash(y_) % (2**32 - 1)))
|
|
c.append(random_state.uniform(low=-10.0, high=10.0, size=(100,)))
|
|
for n in syns:
|
|
if n is None: continue
|
|
try:
|
|
v = _vectors.word_vec(any2unicode(n))
|
|
except KeyError as error:
|
|
# v = np.zeros((100,), dtype=float)
|
|
random_state = np.random.RandomState(seed=(hash(n) % (2 ** 32 - 1)))
|
|
v = random_state.uniform(low=10.0, high=10.0, size=(100,))
|
|
c.append(v)
|
|
r = np.average(c, axis=0)
|
|
vectors.append(r)
|
|
return vectors
|
|
|
|
'''
|
|
Distance
|
|
'''
|
|
# Levenshtein Distance
|
|
def _levenshtein_distance(sentence1, sentence2):
|
|
'''
|
|
Return the Levenshtein distance between two strings.
|
|
Based on:
|
|
http://rosettacode.org/wiki/Levenshtein_distance#Python
|
|
'''
|
|
first = any2utf8(sentence1).decode('utf-8', 'ignore')
|
|
second = any2utf8(sentence2).decode('utf-8', 'ignore')
|
|
sentence1_len, sentence2_len = len(first), len(second)
|
|
maxlen = max(sentence1_len, sentence2_len)
|
|
if sentence1_len > sentence2_len:
|
|
first, second = second, first
|
|
|
|
distances = range(len(first) + 1)
|
|
for index2, char2 in enumerate(second):
|
|
new_distances = [index2 + 1]
|
|
for index1, char1 in enumerate(first):
|
|
if char1 == char2:
|
|
new_distances.append(distances[index1])
|
|
else:
|
|
new_distances.append(1 + min((distances[index1],
|
|
distances[index1 + 1],
|
|
new_distances[-1])))
|
|
distances = new_distances
|
|
levenshtein = distances[-1]
|
|
d = float((maxlen - levenshtein)/maxlen)
|
|
# smoothing
|
|
s = (sigmoid(d * 6) - 0.5) * 2
|
|
# print("smoothing[%s| %s]: %s -> %s" % (sentence1, sentence2, d, s))
|
|
return s
|
|
|
|
def _nearby_levenshtein_distance(s1, s2):
|
|
'''
|
|
使用空间距离近的词汇优化编辑距离计算
|
|
'''
|
|
s1_len, s2_len = len(s1), len(s2)
|
|
maxlen = max(s1_len, s2_len)
|
|
first, second = (s2, s1) if s1_len == maxlen else (s1, s2)
|
|
ft = set() # all related words with first sentence
|
|
for x in first:
|
|
ft.add(x)
|
|
n, _ = nearby(x)
|
|
for o in n:
|
|
ft.add(o)
|
|
scores = []
|
|
if len(ft) == 0: return 0.0 # invalid length for first string
|
|
for x in second:
|
|
scores.append(max([_levenshtein_distance(x, y) for y in ft]))
|
|
s = np.sum(scores) / maxlen
|
|
return s
|
|
|
|
def _similarity_distance(s1, s2):
|
|
'''
|
|
compute similarity with distance measurement
|
|
'''
|
|
g = cosine(_flat_sum_array(_get_wv(s1)), _flat_sum_array(_get_wv(s2)))
|
|
u = _nearby_levenshtein_distance(s1, s2)
|
|
# print("g: %s, u: %s" % (g, u))
|
|
if u > 0.8:
|
|
r = _similarity_smooth(g, 0.1, u)
|
|
elif u > 0.6:
|
|
r = _similarity_smooth(g, 0.25, u)
|
|
elif u > 0.4:
|
|
r = _similarity_smooth(g, 0.5, u)
|
|
else:
|
|
r = _similarity_smooth(g, 1, u)
|
|
|
|
if r < 0: r = abs(r)
|
|
r = min(r, 1.0)
|
|
return float("%.3f" % r)
|
|
|
|
'''
|
|
Public Methods
|
|
'''
|
|
seg = _segment_words # word segmenter
|
|
|
|
def nearby(word):
|
|
'''
|
|
Nearby word
|
|
'''
|
|
w = any2unicode(word)
|
|
# read from cache
|
|
if w in _cache_nearby: return _cache_nearby[w]
|
|
|
|
words, scores = [], []
|
|
try:
|
|
for x in _vectors.neighbours(w):
|
|
words.append(x[0])
|
|
scores.append(x[1])
|
|
except: pass # ignore key error, OOV
|
|
# put into cache
|
|
_cache_nearby[w] = (words, scores)
|
|
return words, scores
|
|
|
|
def compare(s1, s2, seg=True):
|
|
'''
|
|
compare similarity
|
|
s1 : sentence1
|
|
s2 : sentence2
|
|
seg : True : The original sentences need jieba.cut
|
|
Flase : The original sentences have been cut.
|
|
'''
|
|
if seg:
|
|
s1 = [x for x in jieba.cut(s1)]
|
|
s2 = [x for x in jieba.cut(s2)]
|
|
else:
|
|
s1 = s1.split()
|
|
s2 = s2.split()
|
|
assert len(s1) > 0 and len(s2) > 0, "The length of s1 and s2 should > 0."
|
|
return _similarity_distance(s1, s2)
|
|
|
|
def display(word):
|
|
print("'%s'近义词:" % word)
|
|
o = nearby(word)
|
|
assert len(o) == 2, "should contain 2 list"
|
|
if len(o[0]) == 0:
|
|
print(" out of vocabulary")
|
|
for k, v in enumerate(o[0]):
|
|
print(" %d. %s:%s" % (k + 1, v, o[1][k]))
|
|
|
|
def main():
|
|
display("人脸")
|
|
display("NOT_EXIST")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|