Add files via upload

This commit is contained in:
missQian 2020-10-04 21:51:05 +08:00 committed by GitHub
parent 52f33edb64
commit ce53f4c43a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 481 additions and 0 deletions

View File

@ -0,0 +1,233 @@
#encoding=utf-8
import os
import argparse
import sys
import itertools
import xml.dom.minidom
import xml.etree.ElementTree as ET
import codecs
# dirty import from current dir
script_dirname = os.path.dirname(os.path.abspath(__file__))
sys.path.append(script_dirname)
from tokenizer import Tokenizer
def rsd2ltf(rsd_str, doc_id,
seg_option='linebreak',
tok_option='unitok',
re_segment=False):
tokenizer = Tokenizer(seg_option, tok_option)
if re_segment:
# running segmentation and tokenization, then re-segment the tokenized
# sentences (use space to concatenate tokens. this solves segmentation
# problem, e.g. How are you?I'm fine.).
sents = tokenizer.run_segmenter(rsd_str)
raw_tokens = tokenizer.run_tokenizer(sents)
# re-segment tokenized sentence
num_sent_reseg = 0
tokens = []
for i, t in enumerate(raw_tokens):
reseg = [item.split() for item in tokenizer.run_segmenter(' '.join(t))]
if len(reseg) > 1:
num_sent_reseg += 1
tokens += reseg
# compute offset for each token
indexer = 0
token_offset = []
for i, t in enumerate(itertools.chain(*tokens)):
while not rsd_str[indexer:].startswith(t) and \
indexer < len(rsd_str):
indexer += 1
if indexer < len(rsd_str):
t_start = indexer
t_end = t_start + len(t) - 1
assert rsd_str[t_start:t_end + 1] == t, \
"re_segment token offset not match %s-%d" % (doc_id, i)
token_offset.append((t_start, t_end))
indexer = t_end + 1
assert len(token_offset) == len(list(itertools.chain(*tokens))), \
"re_segment tokenization offset error in: %s" % doc_id
# recover sent using tokens
sents = []
prev_token_end = token_offset[0][0]-1
token_index = 0
for i, t in enumerate(tokens):
sent = ''
for j, item in enumerate(t):
if j == 0:
prev_token_end = token_offset[token_index][0] - 1
sent += ' ' * (token_offset[token_index][0] - prev_token_end - 1) + item
prev_token_end = token_offset[token_index][1]
token_index += 1
assert sent in rsd_str, \
're_segment sentence offset error.'
sents.append(sent)
else:
# running segmentation and tokenization
sents = tokenizer.run_segmenter(rsd_str)
tokens = tokenizer.run_tokenizer(sents)
# generate offset for sentences and tokens
indexer = 0
sent_offset = []
for i, s in enumerate(sents):
while not rsd_str[indexer:].startswith(s) and indexer < len(rsd_str):
indexer += 1
if indexer < len(rsd_str):
sent_start = indexer
sent_end = sent_start + len(s) - 1
assert rsd_str[sent_start:sent_end+1] == s, \
"sentence offset not match %s-%d" % (doc_id, i)
sent_offset.append((sent_start, sent_end))
indexer = sent_end + 1
assert len(sent_offset) == len(sents), \
"sentence segmentation offset error in: %s" % doc_id
token_offsets = []
for i, tok in enumerate(tokens):
sent_text = sents[i]
indexer = 0
t_offset = []
for j, t in enumerate(tok):
while not sent_text[indexer:].startswith(t) and \
indexer < len(sent_text):
indexer += 1
if indexer < len(sent_text):
t_start = indexer
t_end = t_start + len(t) - 1
assert sent_text[t_start:t_end+1] == t, \
"token offset not match %s-%d-%d" % (doc_id, i, j)
t_offset.append((t_start, t_end))
indexer = t_end + 1
token_offsets.append(t_offset)
assert len(t_offset) == len(tok), \
"tokenization offset error in: %s-%d" % (doc_id, i)
# convert seg/tok result to ltf
root = ET.Element('LCTL_TEXT')
doc_element = ET.Element('DOC', {'id': doc_id})
text_element = ET.Element('TEXT')
root.append(doc_element)
doc_element.append(text_element)
for i in range(len(sents)):
seg_text = sents[i]
seg_start_char = sent_offset[i][0]
seg_end_char = sent_offset[i][1]
seg_id = '%s-%s' % (doc_id, str(i))
seg_element = ET.Element('SEG', {'id': seg_id,
'start_char': str(seg_start_char),
'end_char': str(seg_end_char)})
original_text_element = ET.Element('ORIGINAL_TEXT')
original_text_element.text = seg_text
seg_element.append(original_text_element)
for j in range(len(tokens[i])):
token_id = 'token-%d-%d' % (i, j)
tok_text = tokens[i][j]
if not tok_text:
continue
tok_start_char = int(token_offsets[i][j][0]) + seg_start_char
tok_end_char = int(token_offsets[i][j][1]) + seg_start_char
assert rsd_str[tok_start_char:tok_end_char+1] == tok_text
token_element = ET.Element('TOKEN',
{'id': token_id,
'start_char': str(tok_start_char),
'end_char': str(tok_end_char)})
token_element.text = tok_text
seg_element.append(token_element)
text_element.append(seg_element)
return root
def write2file(ltf_root, out_file):
# pretty print xml
root_str = ET.tostring(ltf_root, 'utf-8')
f_xml = xml.dom.minidom.parseString(root_str)
pretty_xml_as_string = f_xml.toprettyxml(encoding="utf-8")
f = open(out_file, 'wb')
f.write(pretty_xml_as_string)
f.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('rsd_input', type=str,
help='input rsd file path or directory.')
parser.add_argument('ltf_output', type=str,
help='output ltf file path or directory.')
t = Tokenizer()
parser.add_argument('--seg_option', default='linebreak',
help="segmentation options: %s (default is linebreak)" %
', '.join(t.segmenters.keys()))
parser.add_argument('--tok_option', default='unitok',
help="tokenization options: %s (default is unitok)" %
', '.join(t.tokenizers.keys()))
parser.add_argument('--extension', default='.rsd.txt',
help="extension of rsd file")
parser.add_argument('--re_segment', action='store_true', default=False,
help='first run tokenizaiton, and then segmentation.')
args = parser.parse_args()
input_rsd = args.rsd_input
output_ltf = args.ltf_output
seg_option = args.seg_option
tok_option = args.tok_option
extension = args.extension
re_segment = args.re_segment
rsd_files = []
output_files = []
if os.path.isdir(input_rsd):
assert os.path.isdir(output_ltf)
for fn in os.listdir(input_rsd):
if extension not in fn:
continue
rsd_files.append(os.path.join(input_rsd, fn))
output_files.append(os.path.join(output_ltf,
fn.replace(extension, '.ltf.xml')))
else:
rsd_files = [input_rsd]
output_files = [output_ltf]
for k, rsd_f in enumerate(rsd_files):
try:
rsd_str = codecs.open(rsd_f, 'r', 'utf-8').read()
doc_id = os.path.basename(rsd_f).replace(extension, '')
ltf_root = rsd2ltf(rsd_str, doc_id, seg_option, tok_option,
re_segment)
write2file(ltf_root, output_files[k])
except AssertionError as e:
print(e)
sys.stdout.write('%d files processed.\r' % k)
sys.stdout.flush()
sys.stdout.write('%d files processed.' % len(rsd_files))

View File

@ -0,0 +1,248 @@
#encoding=utf-8
import os
import jieba
import nltk
import re
import itertools
import unicodedata as ud
class Tokenizer(object):
def __init__(self, seg_option="linebreak", tok_option="unitok"):
self.segmenters = {'linebreak': self.seg_linebreak,
'nltk': self.seg_nltk,
'cmn': self.seg_cmn,
'edl_spanish': self.seg_edl_spanish,
'edl_cmn': self.seg_edl_cmn,
'nltk+linebreak': self.seg_nltk_linebreak,
'tigrinya': self.seg_tigrinya
}
self.tokenizers = {'unitok': self.tok_unitok,
'unitok_cut': self.tok_unitok_cut,
'regexp': self.tok_regexp,
'nltk_wordpunct': self.tok_nltk_wordpunct,
'space': self.tok_space,
'char': self.tok_char,
'jieba': self.tok_jieba,
}
self.root_dir = os.path.dirname(os.path.abspath(__file__))
self.seg_option = seg_option
self.tok_option = tok_option
# initialize jieba cn tok
if tok_option == 'jieba':
jieba.initialize()
def run_segmenter(self, plain_text):
# right strip plain text
plain_text = plain_text.rstrip()
# run segmenter
sents = self.segmenters[self.seg_option](plain_text)
sents = [s for s in sents if s.strip()]
return sents
def run_tokenizer(self, sents):
# right strip each sent
for i in range(len(sents)):
sents[i] = sents[i].rstrip()
# run tokenizer
tokenized_sents = self.tokenizers[self.tok_option](sents)
for i, s in enumerate(tokenized_sents):
s = [t for t in s if t.strip()]
tokenized_sents[i] = s
return tokenized_sents
#
# segmenters
#
def seg_linebreak(self, plain_text):
"""
use "\n" as delimiter
:param plain_text:
:return:
"""
result = [item.strip() for item in plain_text.split('\n') if item.strip()]
return result
def seg_nltk(self, plain_text):
"""
use nltk default segmenter
:param plain_text:
:return:
"""
result = [item.strip() for item in nltk.sent_tokenize(plain_text)]
return result
def seg_nltk_linebreak(self, plain_text):
"""
use nltk segmenter and then use "\n" as delimiter to re-segment.
:param plain_text:
:return:
"""
nltk_result = '\n'.join(self.seg_nltk(plain_text))
linebreak_result = self.seg_linebreak(nltk_result)
return linebreak_result
def seg_cmn(self, plain_text):
"""
use Chinese punctuation as delimiter
:param plain_text:
:return:
"""
res = []
sent_end_char = [u'', u'', u'']
current_sent = ''
for i, char in enumerate(list(plain_text)):
if char in sent_end_char or i == len(list(plain_text)) - 1:
res.append(current_sent + char)
current_sent = ''
else:
current_sent += char
return [item.strip() for item in res]
def seg_edl(self, plain_text, seg_option):
# replace \n with ' ' because of the fix line length of edl data
# plain_text = plain_text.replace('\n', ' ')
# do sentence segmentation
if seg_option == 'edl_spanish':
# use nltk sent tokenization for spanish
tmp_seg = nltk.sent_tokenize(plain_text)
if seg_option == 'edl_cmn':
# use naive sent tokenization for chinese
tmp_seg = self.seg_cmn(plain_text)
# recover \n after xml tag
recovered_tmp_seg = []
for sent in tmp_seg:
sent = sent.replace('> ', '>\n').replace(' <', '\n<')
sent = sent.split('\n')
recovered_tmp_seg += [item.strip() for item in sent]
return recovered_tmp_seg
def seg_edl_spanish(self, plain_text):
return self.seg_edl(plain_text, 'edl_spanish')
def seg_edl_cmn(self, plain_text):
return self.seg_edl(plain_text, 'edl_cmn')
def seg_tigrinya(self, plain_text):
result = [item.strip() for item in plain_text.split('\n') if
item.strip()]
updated_result = []
for r in result:
if '' in r:
sents = []
start = 0
for i, char in enumerate(r):
if char == '':
sents.append(r[start:i+1])
start = i + 1
updated_result += sents
else:
updated_result.append(r)
return updated_result
#
# tokenizers
#
def tok_unitok(self, sents):
res = []
for s in sents:
s = unitok_tokenize(s).split()
res.append(s)
return res
def tok_unitok_cut(self, sents):
res = []
num_sent_cut = 0
for s in sents:
s = unitok_tokenize(s).split()
if len(s) > 80:
sub_sents = [item.split() for item in nltk.sent_tokenize(' '.join(s))]
assert sum([len(item) for item in sub_sents]) == len(s)
# sub_sent = [list(group) for k, group in
# itertools.groupby(s, lambda x: x == ".") if not k]
res += sub_sents
if len(sub_sents) > 1:
num_sent_cut += 1
else:
res.append(s)
print('%d sentences longer than 80 and cut by delimiter ".".')
return res
def tok_regexp(self, sents):
result = []
for s in sents:
tokenizer = nltk.tokenize.RegexpTokenizer('\w+|\$[\d\.]+|\S+')
tokenization_out = tokenizer.tokenize(s)
result.append(tokenization_out)
return result
def tok_nltk_wordpunct(self, sents):
result = []
for s in sents:
tokenizer = nltk.tokenize.WordPunctTokenizer()
tokenization_out = tokenizer.tokenize(s)
result.append(tokenization_out)
return result
def tok_space(self, sents):
result = []
for s in sents:
tokenization_out = s.split(' ')
result.append(tokenization_out)
return result
def tok_char(self, sents):
result = []
for s in sents:
tokenization_out = list(s)
result.append(tokenization_out)
return result
def tok_jieba(self, sents):
result = []
for s in sents:
raw_tokenization_out = list(jieba.cut(s))
result.append(raw_tokenization_out)
return result
# by Jon May
def unitok_tokenize(data):
toks = []
for offset, char in enumerate(data):
cc = ud.category(char)
# separate text by punctuation or symbol
if char in ['ʼ', '', '', '´', '', "'"]: # do not tokenize oromo apostrophe
toks.append(char)
elif cc.startswith("P") or cc.startswith("S") \
or char in ['', '']: # Tigrinya period and comma
toks.append(' ')
toks.append(char)
toks.append(' ')
else:
toks.append(char)
toks = [item for item in ''.join(toks).split() if item]
return ' '.join(toks)