QASystemOnMedicalKG/answer_search.py

136 lines
6.3 KiB
Python
Raw Normal View History

2018-10-05 12:12:19 +08:00
#!/usr/bin/env python3
# coding: utf-8
# File: answer_search.py
# Author: lhy<lhy_in_blcu@126.com,https://huangyong.github.io>
# Date: 18-10-5
2018-10-05 17:20:22 +08:00
from py2neo import Graph
2018-10-05 12:12:19 +08:00
class AnswerSearcher:
def __init__(self):
self.g = Graph(
host="127.0.0.1",
http_port=7474,
user="lhy",
password="lhy123")
2018-10-05 17:20:22 +08:00
self.num_limit = 20
2018-10-05 12:12:19 +08:00
'''执行cypher查询并返回相应结果'''
def search_main(self, sqls):
2018-10-05 17:20:22 +08:00
final_answers = []
for sql_ in sqls:
question_type = sql_['question_type']
queries = sql_['sql']
answers = []
for query in queries:
ress = self.g.run(query).data()
answers += ress
final_answer = self.answer_prettify(question_type, answers)
if final_answer:
final_answers.append(final_answer)
return final_answers
2018-10-05 12:12:19 +08:00
2018-10-05 17:20:22 +08:00
'''根据对应的qustion_type调用相应的回复模板'''
def answer_prettify(self, question_type, answers):
final_answer = []
if not answers:
return ''
if question_type == 'disease_symptom':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}的症状包括:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
2018-10-05 12:12:19 +08:00
2018-10-05 17:20:22 +08:00
elif question_type == 'symptom_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '症状{0}可能染上的疾病有:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_cause':
desc = [i['m.cause'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}可能的成因有:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_prevent':
desc = [i['m.prevent'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}的预防措施包括:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_lasttime':
desc = [i['m.cure_lasttime'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}治疗可能持续的周期为:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_cureway':
2018-10-05 19:03:32 +08:00
desc = [';'.join(i['m.cure_way']) for i in answers]
2018-10-05 17:20:22 +08:00
subject = answers[0]['m.name']
final_answer = '{0}可以尝试如下治疗:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_cureprob':
desc = [i['m.cured_prob'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}治愈的概率为(仅供参考):{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_easyget':
desc = [i['m.easy_get'] for i in answers]
subject = answers[0]['m.name']
2018-10-05 19:03:32 +08:00
2018-10-05 17:20:22 +08:00
final_answer = '{0}的易感人群包括:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_desc':
desc = [i['m.desc'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0},熟悉一下:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_acompany':
desc1 = [i['n.name'] for i in answers]
desc2 = [i['m.name'] for i in answers]
subject = answers[0]['m.name']
desc = [i for i in desc1 + desc2 if i != subject]
final_answer = '{0}的症状包括:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_not_food':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}忌食的食物包括有:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_do_food':
do_desc = [i['n.name'] for i in answers if i['r.name'] == '宜吃']
recommand_desc = [i['n.name'] for i in answers if i['r.name'] == '推荐食谱']
subject = answers[0]['m.name']
final_answer = '{0}宜食的食物包括有:{1}\n推荐食谱包括有:{2}'.format(subject, ';'.join(list(set(do_desc))[:self.num_limit]), ';'.join(list(set(recommand_desc))[:self.num_limit]))
elif question_type == 'food_not_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '患有{0}的人最好不要吃{1}'.format(''.join(list(set(desc))[:self.num_limit]), subject)
elif question_type == 'food_do_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '患有{0}的人建议多试试{1}'.format(''.join(list(set(desc))[:self.num_limit]), subject)
elif question_type == 'disease_drug':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}通常的使用的药品包括:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'drug_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '{0}主治的疾病有{1},可以试试'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'disease_check':
desc = [i['n.name'] for i in answers]
subject = answers[0]['m.name']
final_answer = '{0}通常可以通过以下方式检查出来:{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
elif question_type == 'check_disease':
desc = [i['m.name'] for i in answers]
subject = answers[0]['n.name']
final_answer = '通常可以通过{0}检查出来的疾病有{1}'.format(subject, ''.join(list(set(desc))[:self.num_limit]))
return final_answer
2018-10-05 12:12:19 +08:00
if __name__ == '__main__':
2018-10-05 17:20:22 +08:00
searcher = AnswerSearcher()