Minor fix contribution.

This commit is contained in:
Giuseppe Attardi 2015-08-30 21:18:17 +02:00
commit 70956025f1
2 changed files with 178 additions and 92 deletions

View File

@ -15,22 +15,23 @@ Each file will contains several documents in this [document format](http://media
usage: WikiExtractor.py [-h] [-o OUTPUT] [-b n[KMG]] [-c] [--html] [-l]
[-ns ns1,ns2] [-s] [--templates TEMPLATES]
[--no-templates] [--threads THREADS] [-q] [--debug]
[--no-templates] [--processes PROCESSES] [-q] [--debug]
[-a] [-v]
input
positional arguments:
input XML wiki dump file
input XML wiki dump file; use '-' to read from stdin
optional arguments:
-h, --help show this help message and exit
--threads THREADS Number of threads to use (default 2)
--processes PROCESSES number of processes to use (default number of CPU cores)
Output:
-o OUTPUT, --output OUTPUT
output directory
output path; a file if no max bytes per file set,
otherwise a directory to collect files. use '-' for stdout.
-b n[KMG], --bytes n[KMG]
put specified bytes per output file (default is 1M)
maximum bytes per output file (default is no limit: one file)
-c, --compress compress output files using bzip
Processing:

View File

@ -56,6 +56,9 @@ import bz2
import codecs
from htmlentitydefs import name2codepoint
import Queue, threading, multiprocessing
import StringIO
import fileinput
from timeit import default_timer
#===========================================================================
@ -216,7 +219,7 @@ comment = re.compile(r'<!--.*?-->', re.DOTALL)
# Match ignored tags
ignored_tag_patterns = []
def ignoreTag(tag):
left = re.compile(r'<%s\b[^>/]*>' % tag, re.IGNORECASE) # both <ref> and <reference>
left = re.compile(r'<%s\b.*?>' % tag, re.IGNORECASE | re.DOTALL) # both <ref> and <reference>
right = re.compile(r'</\s*%s>' % tag, re.IGNORECASE)
ignored_tag_patterns.append((left, right))
@ -295,8 +298,7 @@ class Template(list):
logging.debug('subst tpl (%d, %d) %s', len(extractor.frame), depth, self)
if depth > extractor.maxParameterRecursionLevels:
logging.warn('Reachead maximum parameter recursions: %d',
extractor.maxParameterRecursionLevels)
extractor.recursion_exceeded_3_errs += 1
return ''
return ''.join([tpl.subst(params, extractor, depth) for tpl in self])
@ -389,9 +391,14 @@ class Extractor(object):
self.page = page
self.magicWords = MagicWords()
self.frame = []
self.recursion_exceeded_1_errs = 0
self.recursion_exceeded_2_errs = 0
self.recursion_exceeded_3_errs = 0
self.untitled_template_errs = 0
def extract(self, out=sys.stdout):
logging.info("%s\t%s", self.id, self.title)
logging.debug("%s\t%s", self.id, self.title)
text = ''.join(self.page)
url = get_url(self.id)
header = '<doc id="%s" url="%s" title="%s">\n' % (self.id, url, self.title)
@ -407,13 +414,19 @@ class Extractor(object):
self.magicWords['currenttime'] = time.strftime('%H:%M:%S')
text = clean(self, text)
footer = "\n</doc>\n"
if out != sys.stdout:
out.reserve(len(header) + len(text) + len(footer))
out.write(header)
for line in compact(text):
out.write(line.encode('utf-8'))
out.write('\n')
out.write(footer)
errs = (self.recursion_exceeded_1_errs,
self.recursion_exceeded_2_errs,
self.recursion_exceeded_3_errs,
self.untitled_template_errs)
if any(errs):
logging.warn("template errors '%s' (%s): untitled(%d) recursion(%d,%d,%d)", self.title, self.id, *errs)
#----------------------------------------------------------------------
# Expand templates
@ -448,7 +461,7 @@ class Extractor(object):
res = ''
if len(self.frame) >= self.maxTemplateRecursionLevels:
logging.warn('Max template recursion exceeded!')
self.recursion_exceeded_1_errs += 1
return res
logging.debug('<expandTemplates ' + str(len(self.frame)))
@ -579,8 +592,7 @@ class Extractor(object):
# equals sign are indexed 1, 2, .., given as attribute in the <name> tag.
if len(self.frame) >= self.maxTemplateRecursionLevels:
logging.warn('Reached max template recursion: %d',
self.maxTemplateRecursionLevels)
self.recursion_exceeded_2_errs += 1
logging.debug(' INVOCATION> %d %s', len(self.frame), body)
return ''
@ -616,6 +628,9 @@ class Extractor(object):
return self.expandTemplates(ret)
title = fullyQualifiedTemplateTitle(title)
if not title:
self.untitled_template_errs += 1
return ''
redirected = redirects.get(title)
if redirected:
@ -1099,8 +1114,7 @@ def fullyQualifiedTemplateTitle(templateTitle):
if templateTitle:
return templatePrefix + ucfirst(templateTitle)
else:
logging.warn("Skipping page with empty title")
return ''
return '' # caller may log as error
def normalizeNamespace(ns):
return ucfirst(ns)
@ -1231,6 +1245,8 @@ def sharp_invoke(module, function, frame):
# find parameters in frame whose title is the one of the original
# template invocation
templateTitle = fullyQualifiedTemplateTitle(function)
if not templateTitle:
logging.warn("Template with empty title")
pair = next((x for x in frame if x[0] == templateTitle), None)
if pair:
params = pair[1]
@ -1385,8 +1401,8 @@ def dropNested(text, openDelim, closeDelim):
"""
A matching function for nested expressions, e.g. namespaces and tables.
"""
openRE = re.compile(openDelim)
closeRE = re.compile(closeDelim)
openRE = re.compile(openDelim, re.IGNORECASE)
closeRE = re.compile(closeDelim, re.IGNORECASE)
# partition text in separate blocks { } { }
spans = [] # pairs (s, e) for each partition
nest = 0 # nesting level
@ -2083,21 +2099,19 @@ class NextFile(object):
filesPerDir = 100
def __init__(self, lock, path_name):
self.lock = lock
def __init__(self, path_name):
self.path_name = path_name
self.dir_index = -1
self.file_index = -1
def next(self):
with self.lock:
self.file_index = (self.file_index + 1) % NextFile.filesPerDir
if self.file_index == 0:
self.dir_index += 1
dirname = self._dirname()
if not os.path.isdir(dirname):
os.makedirs(dirname)
return self._filepath()
self.file_index = (self.file_index + 1) % NextFile.filesPerDir
if self.file_index == 0:
self.dir_index += 1
dirname = self._dirname()
if not os.path.isdir(dirname):
os.makedirs(dirname)
return self._filepath()
def _dirname(self):
char1 = self.dir_index % 26
@ -2130,6 +2144,7 @@ class OutputSplitter(object):
self.file = self.open(self.nextFile.next())
def write(self, data):
self.reserve(len(data))
self.file.write(data)
def close(self):
@ -2206,34 +2221,36 @@ def load_templates(file, output_file=None):
output.write('</page>\n')
page = []
articles += 1
if articles % 10000 == 0:
if articles % 100000 == 0:
logging.info("Preprocessed %d pages", articles)
if output_file:
output.close()
logging.info("Saved %d templates to '%s'", len(templates), output_file)
def process_dump(input_file, template_file, outdir, file_size, file_compress, threads):
def process_dump(input_file, template_file, out_file, file_size, file_compress, process_count):
"""
:param input_file: name of the wikipedia dump file.
:param input_file: name of the wikipedia dump file; '-' to read from stdin
:param template_file: optional file with template definitions.
:param outdir: name of the directory where to store extracted files.
:param file_size: max size of each extracted file.
:param out_file: name of the file (if no file_size limit) or directory to store extracted data
:param file_size: max size of each extracted file, or None for no max (one file)
:param file_compress: whether to compress files with bzip.
:param process_count: number of extration processes to spawn.
"""
global urlbase
global knownNamespaces
global templateNamespace
global expand_templates
if input_file.lower().endswith("bz2"):
opener = bz2.BZ2File
if input_file == '-':
input = sys.stdin
else:
opener = open
input = opener(input_file)
input = fileinput.FileInput(input_file,openhook=fileinput.hook_compressed)
# collect siteinfo
for line in input:
line = line.decode('utf-8')
m = tagRE.search(line)
if not m:
if not m:
continue
tag = m.group(2)
if tag == 'base':
@ -2251,38 +2268,56 @@ def process_dump(input_file, template_file, outdir, file_size, file_compress, th
if expand_templates:
# preprocess
logging.info("Preprocessing dump to collect template definitions: this may take some time.")
template_load_start = default_timer()
if template_file and os.path.exists(template_file):
input.close()
with open(template_file) as file:
load_templates(file)
logging.info("Preprocessing '%s' to collect template definitions: this may take some time.", template_file)
file = fileinput.FileInput(template_file,openhook=fileinput.hook_compressed)
load_templates(file)
file.close()
else:
if input_file == '-':
# can't scan then reset stdin; must error w/ suggestion to specify template_file
raise ValueError("to use templates with stdin dump, must supply explicit template-file")
logging.info("Preprocessing '%s' to collect template definitions: this may take some time.", input_file)
load_templates(input, template_file)
input.close()
input = opener(input_file)
input = fileinput.FileInput(input_file,openhook=fileinput.hook_compressed)
template_load_elapsed = default_timer() - template_load_start
logging.info("Loaded %d templates in %.1fs", len(templates), template_load_elapsed)
# process pages
logging.info("Starting processing pages from %s.", input_file)
logging.info("Starting page extraction from %s.", input_file)
extract_start = default_timer()
# ordering/control queue
ordering_queue = multiprocessing.JoinableQueue(maxsize=10 * process_count)
# output queue & single dedicated process
out_queue = multiprocessing.Queue(maxsize=10 * process_count)
outputter = multiprocessing.Process(target=output_process,
args=(ordering_queue,out_queue,out_file,file_size,file_compress))
outputter.start()
# initialize jobs queue
#threads = multiprocessing.cpu_count()
logging.info("Using %d CPUs.", threads)
queue = Queue.Queue(maxsize=2 * threads)
lock = threading.Lock() # for protecting shared state.
logging.info("Using %d extract processes.", process_count)
queue = multiprocessing.Queue(maxsize=10 * process_count)
nextFile = NextFile(lock, outdir)
# start worker threads
# start worker processes
workers = []
for _ in xrange(max(1, threads - 1)): # keep one for master
output_splitter = OutputSplitter(nextFile, file_size, file_compress)
extractor = ExtractorThread(queue, output_splitter)
for _ in xrange(max(1, process_count)):
extractor = multiprocessing.Process(target=extract_process,args=(queue,out_queue))
extractor.daemon = True # only live while parent process lives
extractor.start()
workers.append(extractor)
# we collect indivual lines, since str.join() is significantly faster than
# we collect individual lines, since str.join() is significantly faster than
# concatenation
page = []
id = None
last_id = None
ordinal = 0
inText = False
redirect = False
for line in input:
@ -2318,40 +2353,83 @@ def process_dump(input_file, template_file, outdir, file_size, file_compress, th
page.append(line)
elif tag == '/page':
colon = title.find(':')
if (colon < 0 or title[:colon] in acceptedNamespaces) and \
if (colon < 0 or title[:colon] in acceptedNamespaces) and id != last_id and \
not redirect and not title.startswith(templateNamespace):
queue.put(Extractor(id, title, page), True) # block if full
item = (id, title, page, ordinal)
queue.put(item, True) # goes to any available extract_process
ordering_queue.put(ordinal, True) # goes only to output_process
last_id = id
ordinal += 1
id = None
page = []
# wait for empty queue
queue.join()
input.close()
ordering_queue.put(None) # signal end of work
# wait for empty queue
ordering_queue.join()
extract_duration = default_timer() - extract_start
extract_rate = ordinal / extract_duration
logging.info("Finished %d-process extraction of %d articles in %.1fs (%.1f/s)", process_count, ordinal, extract_duration, extract_rate)
#----------------------------------------------------------------------
# Multithread version
# Multiprocess support
class ExtractorThread(threading.Thread):
"""
Extractor thread.
"""
def __init__(self, queue, splitter):
self._queue = queue
self._splitter = splitter
threading.Thread.__init__(self)
self.setDaemon(True) # let the process die when main thread is killed
self.start()
def extract_process(jobs_queue, done_queue):
"""Pull tuples of raw page content, do CPU/regex-heavy fixup, push finished text"""
while True:
job = jobs_queue.get() # job is (id, title, page, ordinal)
if job:
out = StringIO.StringIO()
Extractor(*job[:3]).extract(out)
done_queue.put((job[3],out.getvalue())) # (ordinal, finished_text)
out.close()
else:
break
def run(self):
def output_process(ordering_queue, docs_queue, out_file, file_size, file_compress):
"""Pull finished article text, write to one or series of files (or stdout)"""
if out_file == '-':
output = sys.stdout
if file_size > 0:
logging.warn("writing to stdout, so max file size %d ignored" % file_size)
if file_compress:
logging.warn("writing to stdout, so no output compression (use external tool)")
elif file_size > 0:
# max size in effect, so file is a directory to collect many files
nextFile = NextFile(out_file)
output = OutputSplitter(nextFile, file_size, file_compress)
else:
# plain single file
if file_compress:
output = bz2.BZ2File(out_file + '.bz2', 'w')
else:
output = open(out_file, 'w')
interval_start = default_timer()
interval_count = 0
ordering_buffer = {}
while True:
next_ordinal = ordering_queue.get()
if next_ordinal == None:
break
while True:
job = self._queue.get()
if job:
job.extract(self._splitter)
self._queue.task_done()
else:
if next_ordinal in ordering_buffer:
output.write(ordering_buffer.pop(next_ordinal))
ordering_queue.task_done()
count_done = next_ordinal + 1
if count_done % 100000 == 0:
interval_rate = (count_done - interval_count) / (default_timer() - interval_start)
logging.info("Extracted %d articles (%.1f/s)", count_done, interval_rate)
interval_start = default_timer()
interval_count = count_done
break
ordinal, text = docs_queue.get()
ordering_buffer[ordinal] = text
if output != sys.stdout:
output.close()
ordering_queue.task_done() # report last None as done
# ----------------------------------------------------------------------
@ -2369,9 +2447,10 @@ def main():
help="XML wiki dump file")
groupO = parser.add_argument_group('Output')
groupO.add_argument("-o", "--output", default="text",
help="output directory")
groupO.add_argument("-b", "--bytes", default="1M",
help="put specified bytes per output file (default is %(default)s)", metavar="n[KMG]")
help="output filename (if no bytes limit) or directory for files (if bytes limit set)")
groupO.add_argument("-b", "--bytes", default="None",
help="maximum bytes per output file; default is no limit (%(default)s)",
metavar="n[KMG]")
groupO.add_argument("-c", "--compress", action="store_true",
help="compress output files using bzip")
@ -2388,8 +2467,9 @@ def main():
help="use or create file containing templates")
groupP.add_argument("--no-templates", action="store_false",
help="Do not expand templates")
parser.add_argument("--threads", type=int, default=2,
help="Number of threads to use (default 2)")
default_process_count = multiprocessing.cpu_count()
parser.add_argument("--processes", type=int, default=default_process_count,
help="Number of extract processes; default is count of CPU cores (%d)"%default_process_count)
groupS = parser.add_argument_group('Special')
groupS.add_argument("-q", "--quiet", action="store_true",
@ -2414,9 +2494,13 @@ def main():
expand_templates = args.no_templates
try:
power = 'kmg'.find(args.bytes[-1].lower()) + 1
file_size = int(args.bytes[:-1]) * 1024 ** power
if file_size < minFileSize: raise ValueError()
if args.bytes == "None":
file_size = -1
else:
power = 'kmg'.find(args.bytes[-1].lower()) + 1
file_size = int(args.bytes[:-1]) * 1024 ** power
if file_size != -1 and file_size < minFileSize:
raise ValueError()
except ValueError:
logging.error('Insufficient or invalid size: %s', args.bytes)
return
@ -2457,16 +2541,17 @@ def main():
Extractor(id, title, [page]).extract()
return
output_dir = args.output
if not os.path.isdir(output_dir):
output_path = args.output
if file_size > 0 and not os.path.isdir(output_path):
try:
os.makedirs(output_dir)
os.makedirs(output_path)
except:
logging.error('Could not create: %s', output_dir)
logging.error('Could not create: %s', output_path)
return
process_dump(input_file, args.templates, output_dir, file_size,
args.compress, args.threads)
process_dump(input_file, args.templates, output_path, file_size,
args.compress, args.processes)
if __name__ == '__main__':
main()