QASystemOnMedicalKG/question_classifier.py

227 lines
10 KiB
Python
Raw Permalink Normal View History

2018-10-04 23:28:23 +08:00
#!/usr/bin/env python3
# coding: utf-8
# File: question_classifier.py
# Author: lhy<lhy_in_blcu@126.com,https://huangyong.github.io>
# Date: 18-10-4
import os
import ahocorasick
class QuestionClassifier:
def __init__(self):
cur_dir = '/'.join(os.path.abspath(__file__).split('/')[:-1])
# 特征词路径
self.disease_path = os.path.join(cur_dir, 'dict/disease.txt')
self.department_path = os.path.join(cur_dir, 'dict/department.txt')
self.check_path = os.path.join(cur_dir, 'dict/check.txt')
self.drug_path = os.path.join(cur_dir, 'dict/drug.txt')
self.food_path = os.path.join(cur_dir, 'dict/food.txt')
self.producer_path = os.path.join(cur_dir, 'dict/producer.txt')
self.symptom_path = os.path.join(cur_dir, 'dict/symptom.txt')
self.deny_path = os.path.join(cur_dir, 'dict/deny.txt')
# 加载特征词
self.disease_wds= [i.strip() for i in open(self.disease_path) if i.strip()]
self.department_wds= [i.strip() for i in open(self.department_path) if i.strip()]
self.check_wds= [i.strip() for i in open(self.check_path) if i.strip()]
self.drug_wds= [i.strip() for i in open(self.drug_path) if i.strip()]
self.food_wds= [i.strip() for i in open(self.food_path) if i.strip()]
self.producer_wds= [i.strip() for i in open(self.producer_path) if i.strip()]
self.symptom_wds= [i.strip() for i in open(self.symptom_path) if i.strip()]
self.region_words = set(self.department_wds + self.disease_wds + self.check_wds + self.drug_wds + self.food_wds + self.producer_wds + self.symptom_wds)
self.deny_words = [i.strip() for i in open(self.deny_path) if i.strip()]
# 构造领域actree
self.region_tree = self.build_actree(list(self.region_words))
# 构建词典
self.wdtype_dict = self.build_wdtype_dict()
# 问句疑问词
self.symptom_qwds = ['症状', '表征', '现象', '症候', '表现']
2018-10-05 17:20:22 +08:00
self.cause_qwds = ['原因','成因', '为什么', '怎么会', '怎样才', '咋样才', '怎样会', '如何会', '为啥', '为何', '如何才会', '怎么才会', '会导致', '会造成']
2018-10-05 12:12:19 +08:00
self.acompany_qwds = ['并发症', '并发', '一起发生', '一并发生', '一起出现', '一并出现', '一同发生', '一同出现', '伴随发生', '伴随', '共现']
self.food_qwds = ['饮食', '饮用', '', '', '伙食', '膳食', '', '' ,'忌口', '补品', '保健品', '食谱', '菜谱', '食用', '食物','补品']
self.drug_qwds = ['', '药品', '用药', '胶囊', '口服液', '炎片']
self.prevent_qwds = ['预防', '防范', '抵制', '抵御', '防止','躲避','逃避','避开','免得','逃开','避开','避掉','躲开','躲掉','绕开',
2018-10-04 23:28:23 +08:00
'怎样才能不', '怎么才能不', '咋样才能不','咋才能不', '如何才能不',
'怎样才不', '怎么才不', '咋样才不','咋才不', '如何才不',
'怎样才可以不', '怎么才可以不', '咋样才可以不', '咋才可以不', '如何可以不',
'怎样才可不', '怎么才可不', '咋样才可不', '咋才可不', '如何可不']
self.lasttime_qwds = ['周期', '多久', '多长时间', '多少时间', '几天', '几年', '多少天', '多少小时', '几个小时', '多少年']
2018-10-05 12:12:19 +08:00
self.cureway_qwds = ['怎么治疗', '如何医治', '怎么医治', '怎么治', '怎么医', '如何治', '医治方式', '疗法', '咋治', '怎么办', '咋办', '咋治']
self.cureprob_qwds = ['多大概率能治好', '多大几率能治好', '治好希望大么', '几率', '几成', '比例', '可能性', '能治', '可治', '可以治', '可以医']
self.easyget_qwds = ['易感人群', '容易感染', '易发人群', '什么人', '哪些人', '感染', '染上', '得上']
2018-10-05 17:20:22 +08:00
self.check_qwds = ['检查', '检查项目', '查出', '检查', '测出', '试出']
2018-10-05 12:12:19 +08:00
self.belong_qwds = ['属于什么科', '属于', '什么科', '科室']
self.cure_qwds = ['治疗什么', '治啥', '治疗啥', '医治啥', '治愈啥', '主治啥', '主治什么', '有什么用', '有何用', '用处', '用途',
'有什么好处', '有什么益处', '有何益处', '用来', '用来做啥', '用来作甚', '需要', '']
2018-10-04 23:28:23 +08:00
print('model init finished ......')
return
'''分类主函数'''
def classify(self, question):
data = {}
medical_dict = self.check_medical(question)
if not medical_dict:
return {}
data['args'] = medical_dict
#收集问句当中所涉及到的实体类型
types = []
for type_ in medical_dict.values():
types += type_
question_type = 'others'
2018-10-05 12:12:19 +08:00
question_types = []
2018-10-04 23:28:23 +08:00
# 症状
if self.check_words(self.symptom_qwds, question) and ('disease' in types):
question_type = 'disease_symptom'
2018-10-05 12:12:19 +08:00
question_types.append(question_type)
if self.check_words(self.symptom_qwds, question) and ('symptom' in types):
question_type = 'symptom_disease'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 原因
if self.check_words(self.cause_qwds, question) and ('disease' in types):
question_type = 'disease_cause'
2018-10-05 12:12:19 +08:00
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 并发症
2018-10-05 12:12:19 +08:00
if self.check_words(self.acompany_qwds, question) and ('disease' in types):
2018-10-04 23:28:23 +08:00
question_type = 'disease_acompany'
2018-10-05 12:12:19 +08:00
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 推荐食品
2018-10-05 12:12:19 +08:00
if self.check_words(self.food_qwds, question) and 'disease' in types:
2018-10-04 23:28:23 +08:00
deny_status = self.check_words(self.deny_words, question)
if deny_status:
question_type = 'disease_not_food'
else:
question_type = 'disease_do_food'
2018-10-05 12:12:19 +08:00
question_types.append(question_type)
#已知食物找疾病
if self.check_words(self.food_qwds+self.cure_qwds, question) and 'food' in types:
deny_status = self.check_words(self.deny_words, question)
if deny_status:
question_type = 'food_not_disease'
else:
question_type = 'food_do_disease'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 推荐药品
2018-10-05 12:12:19 +08:00
if self.check_words(self.drug_qwds, question) and 'disease' in types:
question_type = 'disease_drug'
question_types.append(question_type)
# 药品治啥病
if self.check_words(self.cure_qwds, question) and 'drug' in types:
question_type = 'drug_disease'
question_types.append(question_type)
# 疾病接受检查项目
if self.check_words(self.check_qwds, question) and 'disease' in types:
question_type = 'disease_check'
question_types.append(question_type)
# 已知检查项目查相应疾病
if self.check_words(self.check_qwds+self.cure_qwds, question) and 'check' in types:
question_type = 'check_disease'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 症状防御
2018-10-05 12:12:19 +08:00
if self.check_words(self.prevent_qwds, question) and 'disease' in types:
question_type = 'disease_prevent'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 疾病医疗周期
2018-10-05 12:12:19 +08:00
if self.check_words(self.lasttime_qwds, question) and 'disease' in types:
question_type = 'disease_lasttime'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 疾病治疗方式
2018-10-05 12:12:19 +08:00
if self.check_words(self.cureway_qwds, question) and 'disease' in types:
question_type = 'disease_cureway'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 疾病治愈可能性
2018-10-05 12:12:19 +08:00
if self.check_words(self.cureprob_qwds, question) and 'disease' in types:
question_type = 'disease_cureprob'
question_types.append(question_type)
2018-10-04 23:28:23 +08:00
# 疾病易感染人群
2018-10-05 12:12:19 +08:00
if self.check_words(self.easyget_qwds, question) and 'disease' in types :
question_type = 'disease_easyget'
question_types.append(question_type)
# 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回
if question_types == [] and 'disease' in types:
question_types = ['disease_desc']
# 若没有查到相关的外部查询信息,那么则将该疾病的描述信息返回
if question_types == [] and 'symptom' in types:
question_types = ['symptom_disease']
2018-10-04 23:28:23 +08:00
2018-10-05 12:12:19 +08:00
# 将多个分类结果进行合并处理,组装成一个字典
data['question_types'] = question_types
2018-10-04 23:28:23 +08:00
return data
'''构造词对应的类型'''
def build_wdtype_dict(self):
wd_dict = dict()
for wd in self.region_words:
wd_dict[wd] = []
if wd in self.disease_wds:
wd_dict[wd].append('disease')
if wd in self.department_wds:
wd_dict[wd].append('department')
if wd in self.check_wds:
wd_dict[wd].append('check')
if wd in self.drug_wds:
wd_dict[wd].append('drug')
if wd in self.food_wds:
wd_dict[wd].append('food')
if wd in self.symptom_wds:
wd_dict[wd].append('symptom')
if wd in self.producer_wds:
wd_dict[wd].append('producer')
return wd_dict
'''构造actree加速过滤'''
def build_actree(self, wordlist):
actree = ahocorasick.Automaton()
for index, word in enumerate(wordlist):
actree.add_word(word, (index, word))
actree.make_automaton()
return actree
'''问句过滤'''
def check_medical(self, question):
region_wds = []
for i in self.region_tree.iter(question):
wd = i[1][1]
region_wds.append(wd)
stop_wds = []
for wd1 in region_wds:
for wd2 in region_wds:
if wd1 in wd2 and wd1 != wd2:
stop_wds.append(wd1)
final_wds = [i for i in region_wds if i not in stop_wds]
final_dict = {i:self.wdtype_dict.get(i) for i in final_wds}
return final_dict
'''基于特征词进行分类'''
def check_words(self, wds, sent):
for wd in wds:
if wd in sent:
return True
return False
if __name__ == '__main__':
handler = QuestionClassifier()
while 1:
question = input('input an question:')
data = handler.classify(question)
print(data)