99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
||
# coding: utf-8
|
||
# File: insert_es.py
|
||
# Author: lhy<lhy_in_blcu@126.com,https://huangyong.github.io>
|
||
# Date: 18-10-10
|
||
|
||
import os
|
||
import time
|
||
|
||
import json
|
||
from elasticsearch import Elasticsearch
|
||
from elasticsearch.helpers import bulk
|
||
import pymongo
|
||
|
||
class ProcessIntoES:
|
||
def __init__(self):
|
||
self._index = "crime_data"
|
||
self.es = Elasticsearch([{"host": "127.0.0.1", "port": 9200}])
|
||
self.doc_type = "crime"
|
||
cur = '/'.join(os.path.abspath(__file__).split('/')[:-1])
|
||
self.music_file = os.path.join(cur, 'qa_corpus.json')
|
||
|
||
'''创建ES索引,确定分词类型'''
|
||
def create_mapping(self):
|
||
node_mappings = {
|
||
"mappings": {
|
||
self.doc_type: { # type
|
||
"properties": {
|
||
"question": { # field: 问题
|
||
"type": "text", # lxw NOTE: cannot be string
|
||
"analyzer": "ik_max_word",
|
||
"search_analyzer": "ik_smart",
|
||
"index": "true" # The index option controls whether field values are indexed.
|
||
},
|
||
"answers": { # field: 问题
|
||
"type": "text", # lxw NOTE: cannot be string
|
||
"analyzer": "ik_max_word",
|
||
"search_analyzer": "ik_smart",
|
||
"index": "true" # The index option controls whether field values are indexed.
|
||
},
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if not self.es.indices.exists(index=self._index):
|
||
self.es.indices.create(index=self._index, body=node_mappings)
|
||
print("Create {} mapping successfully.".format(self._index))
|
||
else:
|
||
print("index({}) already exists.".format(self._index))
|
||
|
||
'''批量插入数据'''
|
||
def insert_data_bulk(self, action_list):
|
||
success, _ = bulk(self.es, action_list, index=self._index, raise_on_error=True)
|
||
print("Performed {0} actions. _: {1}".format(success, _))
|
||
|
||
|
||
'''初始化ES,将数据插入到ES数据库当中'''
|
||
def init_ES():
|
||
pie = ProcessIntoES()
|
||
# 创建ES的index
|
||
pie.create_mapping()
|
||
start_time = time.time()
|
||
index = 0
|
||
count = 0
|
||
action_list = []
|
||
BULK_COUNT = 1000 # 每BULK_COUNT个句子一起插入到ES中
|
||
|
||
for line in open(pie.music_file):
|
||
if not line:
|
||
continue
|
||
item = json.loads(line)
|
||
index += 1
|
||
action = {
|
||
"_index": pie._index,
|
||
"_type": pie.doc_type,
|
||
"_source": {
|
||
"question": item['question'],
|
||
"answers": '\n'.join(item['answers']),
|
||
}
|
||
}
|
||
action_list.append(action)
|
||
if index > BULK_COUNT:
|
||
pie.insert_data_bulk(action_list=action_list)
|
||
index = 0
|
||
count += 1
|
||
print(count)
|
||
action_list = []
|
||
end_time = time.time()
|
||
|
||
print("Time Cost:{0}".format(end_time - start_time))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 将数据库插入到elasticsearch当中
|
||
# init_ES()
|
||
# 按照标题进行查询
|
||
question = '我老公要起诉离婚 我不想离婚怎么办'
|
||
|