Synonyms/synonyms/__init__.py
2018-03-04 22:56:13 +08:00

299 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#=========================================================================
#
# Copyright (c) 2017 <> All Rights Reserved
#
#
# File: /Users/hain/ai/Synonyms/synonyms/__init__.py
# Author: Hai Liang Wang
# Date: 2017-09-27
#
#=========================================================================
"""
Chinese Synonyms for Natural Language Processing and Understanding.
"""
from __future__ import print_function
from __future__ import division
__copyright__ = "Copyright (c) 2017 . All Rights Reserved"
__author__ = "Hu Ying Xi<>, Hai Liang Wang<hailiang.hl.wang@gmail.com>"
__date__ = "2017-09-27"
import os
import sys
import numpy as np
curdir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(curdir)
PLT = 2
if sys.version_info[0] < 3:
default_stdout = sys.stdout
default_stderr = sys.stderr
reload(sys)
sys.stdout = default_stdout
sys.stderr = default_stderr
sys.setdefaultencoding("utf-8")
# raise "Must be using Python 3"
else:
PLT = 3
# Get Environment variables
ENVIRON = os.environ.copy()
import json
import gzip
import shutil
from synonyms.word2vec import KeyedVectors
from synonyms.utils import any2utf8
from synonyms.utils import any2unicode
from synonyms.utils import sigmoid
from synonyms.utils import cosine
import jieba
import jieba.posseg as _tokenizer
'''
globals
'''
_vocab = dict()
_size = 0
_vectors = None
_stopwords = set()
_cache_nearby = dict()
'''
lambda fns
'''
# combine similarity scores
_similarity_smooth = lambda x, y, z: (x * y) + z
_flat_sum_array = lambda x: np.sum(x, axis=0) # 分子
'''
tokenizer settings
'''
tokenizer_dict = os.path.join(curdir, 'data', 'vocab.txt')
if "SYNONYMS_WORDSEG_DICT" in ENVIRON:
if os.exist(ENVIRON["SYNONYMS_WORDSEG_DICT"]):
print("info: set wordseg dict with %s" % tokenizer_dict)
tokenizer_dict = ENVIRON["SYNONYMS_WORDSEG_DICT"]
else: print("warning: can not find dict at [%s]" % tokenizer_dict)
print(">> Synonyms load wordseg dict [%s] ... " % tokenizer_dict)
jieba.set_dictionary(tokenizer_dict)
# stopwords
_fin_stopwords_path = os.path.join(curdir, 'data', 'stopwords.txt')
def _load_stopwords(file_path):
'''
load stop words
'''
global _stopwords
if sys.version_info[0] < 3:
words = open(file_path, 'r')
else:
words = open(file_path, 'r', encoding='utf-8')
stopwords = words.readlines()
for w in stopwords:
_stopwords.add(any2unicode(w).strip())
print(">> Synonyms on loading stopwords [%s] ..." % _fin_stopwords_path)
_load_stopwords(_fin_stopwords_path)
def _segment_words(sen):
'''
segment words with jieba
'''
words, tags = [], []
m = _tokenizer.cut(sen, HMM=True) # HMM更好的识别新词
for x in m:
words.append(x.word)
tags.append(x.flag)
return words, tags
'''
word embedding
'''
# vectors
_f_model = os.path.join(curdir, 'data', 'words.vector')
if "SYNONYMS_WORD2VEC_BIN_MODEL_ZH_CN" in ENVIRON:
_f_model = ENVIRON["SYNONYMS_WORD2VEC_BIN_MODEL_ZH_CN"]
def _load_w2v(model_file=_f_model, binary=True):
'''
load word2vec model
'''
if not os.path.exists(model_file):
print("os.path : ", os.path)
raise Exception("Model file [%s] does not exist." % model_file)
return KeyedVectors.load_word2vec_format(
model_file, binary=binary, unicode_errors='ignore')
print(">> Synonyms on loading vectors [%s] ..." % _f_model)
_vectors = _load_w2v(model_file=_f_model)
def _get_wv(sentence):
'''
get word2vec data by sentence
sentence is segmented string.
'''
global _vectors
vectors = []
for y in sentence:
y_ = any2unicode(y).strip()
if y_ not in _stopwords:
syns = nearby(y_)[0]
# print("sentence %s word: %s" %(sentence, y_))
# print("sentence %s word nearby: %s" %(sentence, " ".join(syns)))
c = []
try:
c.append(_vectors.word_vec(y_))
except KeyError as error:
print("not exist in w2v model: %s" % y_)
# c.append(np.zeros((100,), dtype=float))
random_state = np.random.RandomState(seed=(hash(y_) % (2**32 - 1)))
c.append(random_state.uniform(low=-10.0, high=10.0, size=(100,)))
for n in syns:
if n is None: continue
try:
v = _vectors.word_vec(any2unicode(n))
except KeyError as error:
# v = np.zeros((100,), dtype=float)
random_state = np.random.RandomState(seed=(hash(n) % (2 ** 32 - 1)))
v = random_state.uniform(low=10.0, high=10.0, size=(100,))
c.append(v)
r = np.average(c, axis=0)
vectors.append(r)
return vectors
'''
Distance
'''
# Levenshtein Distance
def _levenshtein_distance(sentence1, sentence2):
'''
Return the Levenshtein distance between two strings.
Based on:
http://rosettacode.org/wiki/Levenshtein_distance#Python
'''
first = any2utf8(sentence1).decode('utf-8', 'ignore')
second = any2utf8(sentence2).decode('utf-8', 'ignore')
sentence1_len, sentence2_len = len(first), len(second)
maxlen = max(sentence1_len, sentence2_len)
if sentence1_len > sentence2_len:
first, second = second, first
distances = range(len(first) + 1)
for index2, char2 in enumerate(second):
new_distances = [index2 + 1]
for index1, char1 in enumerate(first):
if char1 == char2:
new_distances.append(distances[index1])
else:
new_distances.append(1 + min((distances[index1],
distances[index1 + 1],
new_distances[-1])))
distances = new_distances
levenshtein = distances[-1]
d = float((maxlen - levenshtein)/maxlen)
# smoothing
s = (sigmoid(d * 6) - 0.5) * 2
# print("smoothing[%s| %s]: %s -> %s" % (sentence1, sentence2, d, s))
return s
def _nearby_levenshtein_distance(s1, s2):
'''
使用空间距离近的词汇优化编辑距离计算
'''
s1_len, s2_len = len(s1), len(s2)
maxlen = max(s1_len, s2_len)
first, second = (s2, s1) if s1_len == maxlen else (s1, s2)
ft = set() # all related words with first sentence
for x in first:
ft.add(x)
n, _ = nearby(x)
for o in n[:5]:
ft.add(o)
scores = []
if len(ft) == 0: return 0.0 # invalid length for first string
for x in second:
scores.append(max([_levenshtein_distance(x, y) for y in ft]))
s = np.sum(scores) / maxlen
return s
def _similarity_distance(s1, s2):
'''
compute similarity with distance measurement
'''
g = cosine(_flat_sum_array(_get_wv(s1)), _flat_sum_array(_get_wv(s2)))
u = _nearby_levenshtein_distance(s1, s2)
# print("g: %s, u: %s" % (g, u))
if u > 0.8:
r = _similarity_smooth(g, 0.005, u)
elif u > 0.4:
r = _similarity_smooth(g, 0.05, u)
elif u > 0.2:
r = _similarity_smooth(g, 0.5, u)
else:
r = _similarity_smooth(g, 1, u)
if r < 0: r = abs(r)
r = min(r, 1.0)
return float("%.3f" % r)
'''
Public Methods
'''
seg = _segment_words # word segmenter
def nearby(word):
'''
Nearby word
'''
w = any2unicode(word)
# read from cache
if w in _cache_nearby: return _cache_nearby[w]
words, scores = [], []
try:
for x in _vectors.neighbours(w):
words.append(x[0])
scores.append(x[1])
except: pass # ignore key error, OOV
# put into cache
_cache_nearby[w] = (words, scores)
return words, scores
def compare(s1, s2, seg=True):
'''
compare similarity
s1 : sentence1
s2 : sentence2
seg : True : The original sentences need jieba.cut
Flase : The original sentences have been cut.
'''
if seg:
s1 = [x for x in jieba.cut(s1)]
s2 = [x for x in jieba.cut(s2)]
else:
s1 = s1.split()
s2 = s2.split()
assert len(s1) > 0 and len(s2) > 0, "The length of s1 and s2 should > 0."
return _similarity_distance(s1, s2)
def display(word):
print("'%s'近义词:" % word)
o = nearby(word)
assert len(o) == 2, "should contain 2 list"
if len(o[0]) == 0:
print(" out of vocabulary")
for k, v in enumerate(o[0]):
print(" %d. %s:%s" % (k + 1, v, o[1][k]))
def main():
display("人脸")
display("NOT_EXIST")
if __name__ == '__main__':
main()