From 537ab9b4ef9de4e3c616e72463053c5abbfa9c0f Mon Sep 17 00:00:00 2001 From: Versus Void Date: Tue, 15 Aug 2017 22:06:20 +0000 Subject: [PATCH] Handle broken pipe and keyboard interrupts --- WikiExtractor.py | 222 +++++++++++++++++++++++++++-------------------- 1 file changed, 127 insertions(+), 95 deletions(-) diff --git a/WikiExtractor.py b/WikiExtractor.py index 2ab55d9..de8d50d 100755 --- a/WikiExtractor.py +++ b/WikiExtractor.py @@ -67,8 +67,9 @@ import os.path import re # TODO use regex when it will be standard import time import json +import queue from io import StringIO -from multiprocessing import Queue, Process, Value, cpu_count +from multiprocessing import Event, Queue, Process, Value, cpu_count from timeit import default_timer @@ -81,7 +82,7 @@ if PY2: range = xrange # Use Python 3 equivalent chr = unichr # Use Python 3 equivalent text_type = unicode - + class SimpleNamespace(object): def __init__ (self, **kwargs): self.__dict__.update(kwargs) @@ -138,11 +139,11 @@ options = SimpleNamespace( ## # Filter disambiguation pages filter_disambig_pages = False, - + ## # Drop tables from the article keep_tables = False, - + ## # Whether to preserve links in output keepLinks = False, @@ -162,7 +163,7 @@ options = SimpleNamespace( ## # Whether to write json instead of the xml-like default output format write_json = False, - + ## # Whether to expand templates expand_templates = True, @@ -178,18 +179,18 @@ options = SimpleNamespace( ## # Minimum expanded text length required to print document min_text_length = 0, - + # Shared objects holding templates, redirects and cache templates = {}, redirects = {}, # cache of parser templates # FIXME: sharing this with a Manager slows down. templateCache = {}, - + # Elements to ignore/discard - + ignored_tag_patterns = [], - + discardElements = [ 'gallery', 'timeline', 'noinclude', 'pre', 'table', 'tr', 'td', 'th', 'caption', 'div', @@ -582,7 +583,7 @@ class Extractor(object): :param out: a memory file. """ logging.info('%s\t%s', self.id, self.title) - + # Separate header from text with a newline. if options.toHTML: title_str = '

' + self.title + '

' @@ -630,12 +631,12 @@ class Extractor(object): text = self.wiki2text(text) text = compact(self.clean(text)) text = [title_str] + text - + if sum(len(line) for line in text) < options.min_text_length: return - + self.write_output(out, text) - + errs = (self.template_title_errs, self.recursion_exceeded_1_errs, self.recursion_exceeded_2_errs, @@ -2826,6 +2827,12 @@ def pages_from(input): title = None page = [] +def try_put_until(event, q, value): + while not event.is_set(): + try: + return q.put(value, False, 1) + except queue.Full: + pass def process_dump(input_file, template_file, out_file, file_size, file_compress, process_count): @@ -2916,10 +2923,12 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress, max_spool_length = 10000 spool_length = Value('i', 0, lock=False) + broken_pipe_event = Event() + # reduce job that sorts and prints output reduce = Process(target=reduce_process, - args=(options, output_queue, spool_length, - out_file, file_size, file_compress)) + args=(options, output_queue, spool_length, out_file, + file_size, file_compress, broken_pipe_event)) reduce.start() # initialize jobs queue @@ -2930,43 +2939,48 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress, workers = [] for i in range(worker_count): extractor = Process(target=extract_process, - args=(options, i, jobs_queue, output_queue)) + args=(options, i, jobs_queue, output_queue, broken_pipe_event)) extractor.daemon = True # only live while parent process lives extractor.start() workers.append(extractor) # Mapper process - page_num = 0 - for page_data in pages_from(input): - id, revid, title, ns, page = page_data - if keepPage(ns, page): - # slow down - delay = 0 - if spool_length.value > max_spool_length: - # reduce to 10% - while spool_length.value > max_spool_length/10: - time.sleep(10) - delay += 10 - if delay: - logging.info('Delay %ds', delay) - job = (id, revid, title, page, page_num) - jobs_queue.put(job) # goes to any available extract_process - page_num += 1 - page = None # free memory + try: + page_num = 0 + for page_data in pages_from(input): + if broken_pipe_event.is_set(): + break + id, revid, title, ns, page = page_data + if keepPage(ns, page): + # slow down + delay = 0 + if spool_length.value > max_spool_length: + # reduce to 10% + while spool_length.value > max_spool_length/10: + time.sleep(10) + delay += 10 + if delay: + logging.info('Delay %ds', delay) + job = (id, revid, title, page, page_num) + # TODO if pipe is closed + try_put_until(broken_pipe_event, jobs_queue, job) # goes to any available extract_process + page_num += 1 + page = None # free memory - input.close() + input.close() + + except KeyboardInterrupt: + logging.warn("Exiting due interrupt") - # signal termination - for _ in workers: - jobs_queue.put(None) # wait for workers to terminate for w in workers: w.join() - # signal end of work to reduce process - output_queue.put(None) - # wait for it to finish - reduce.join() + if reduce.is_alive(): + # signal end of work to reduce proces + output_queue.put(None) + # wait for reduce process to finish + reduce.join() extract_duration = default_timer() - extract_start extract_rate = page_num / extract_duration @@ -2978,7 +2992,7 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress, # Multiprocess support -def extract_process(opts, i, jobs_queue, output_queue): +def extract_process(opts, i, jobs_queue, output_queue, broken_pipe_event): """Pull tuples of raw page content, do CPU/regex-heavy fixup, push finished text :param i: process id. :param jobs_queue: where to get jobs. @@ -2991,33 +3005,40 @@ def extract_process(opts, i, jobs_queue, output_queue): createLogger(options.quiet, options.debug) out = StringIO() # memory buffer - - - while True: - job = jobs_queue.get() # job is (id, title, page, page_num) - if job: - id, revid, title, page, page_num = job - try: - e = Extractor(*job[:4]) # (id, revid, title, page) - page = None # free memory - e.extract(out) - text = out.getvalue() - except: - text = '' - logging.exception('Processing page: %s %s', id, title) - output_queue.put((page_num, text)) - out.truncate(0) - out.seek(0) - else: - logging.debug('Quit extractor') - break + try: + while not broken_pipe_event.is_set(): + job = jobs_queue.get() # job is (id, title, page, page_num) + if job: + id, revid, title, page, page_num = job + try: + e = Extractor(*job[:4]) # (id, revid, title, page) + page = None # free memory + e.extract(out) + text = out.getvalue() + except Exception: + text = '' + logging.exception('Processing page: %s %s', id, title) + + try_put_until(broken_pipe_event, output_queue, (page_num, text)) + out.truncate(0) + out.seek(0) + else: + logging.debug('Quit extractor') + break + except KeyboardInterrupt: + logging.info('Aborting worker %d', i) + output_queue.cancel_join_thread() + jobs_queue.cancel_join_thread() + + if broken_pipe_event.is_set(): + output_queue.cancel_join_thread() + out.close() - report_period = 10000 # progress report period -def reduce_process(opts, output_queue, spool_length, - out_file=None, file_size=0, file_compress=True): +def reduce_process(opts, output_queue, spool_length, out_file, + file_size, file_compress, broken_pipe_event): """Pull finished article text, write series of files (or stdout) :param opts: global parameters. :param output_queue: text to be output. @@ -3029,9 +3050,9 @@ def reduce_process(opts, output_queue, spool_length, global options options = opts - + createLogger(options.quiet, options.debug) - + if out_file: nextFile = NextFile(out_file) output = OutputSplitter(nextFile, file_size, file_compress) @@ -3044,33 +3065,44 @@ def reduce_process(opts, output_queue, spool_length, # FIXME: use a heap spool = {} # collected pages next_page = 0 # sequence numbering of page - while True: - if next_page in spool: - output.write(spool.pop(next_page).encode('utf-8')) - next_page += 1 - # tell mapper our load: - spool_length.value = len(spool) - # progress report - if next_page % report_period == 0: - interval_rate = report_period / (default_timer() - interval_start) - logging.info("Extracted %d articles (%.1f art/s)", - next_page, interval_rate) - interval_start = default_timer() - else: - # mapper puts None to signal finish - pair = output_queue.get() - if not pair: - break - page_num, text = pair - spool[page_num] = text - # tell mapper our load: - spool_length.value = len(spool) - # FIXME: if an extractor dies, process stalls; the other processes - # continue to produce pairs, filling up memory. - if len(spool) > 200: - logging.debug('Collected %d, waiting: %d, %d', len(spool), - next_page, next_page == page_num) - if output != sys.stdout: + try: + while True: + if next_page in spool: + try: + output.write(spool.pop(next_page).encode('utf-8')) + except BrokenPipeError: + # other side of pipe (like `head` or `grep`) is closed + # we can simply exit + broken_pipe_event.set() + break + + next_page += 1 + # tell mapper our load: + spool_length.value = len(spool) + # progress report + if next_page % report_period == 0: + interval_rate = report_period / (default_timer() - interval_start) + logging.info("Extracted %d articles (%.1f art/s)", + next_page, interval_rate) + interval_start = default_timer() + else: + # mapper puts None to signal finish + pair = output_queue.get() + if not pair: + break + page_num, text = pair + spool[page_num] = text + # tell mapper our load: + spool_length.value = len(spool) + # FIXME: if an extractor dies, process stalls; the other processes + # continue to produce pairs, filling up memory. + if len(spool) > 200: + logging.debug('Collected %d, waiting: %d, %d', len(spool), + next_page, next_page == page_num) + except KeyboardInterrupt: + pass + + if output != sys.stdout and not broken_pipe_event.is_set(): output.close() @@ -3191,7 +3223,7 @@ def main(): options.quiet = args.quiet options.debug = args.debug - + createLogger(options.quiet, options.debug) input_file = args.input