Handle broken pipe and keyboard interrupts

This commit is contained in:
Versus Void 2017-08-15 22:06:20 +00:00
parent 2a5e6aebc0
commit 537ab9b4ef

View File

@ -67,8 +67,9 @@ import os.path
import re # TODO use regex when it will be standard
import time
import json
import queue
from io import StringIO
from multiprocessing import Queue, Process, Value, cpu_count
from multiprocessing import Event, Queue, Process, Value, cpu_count
from timeit import default_timer
@ -81,7 +82,7 @@ if PY2:
range = xrange # Use Python 3 equivalent
chr = unichr # Use Python 3 equivalent
text_type = unicode
class SimpleNamespace(object):
def __init__ (self, **kwargs):
self.__dict__.update(kwargs)
@ -138,11 +139,11 @@ options = SimpleNamespace(
##
# Filter disambiguation pages
filter_disambig_pages = False,
##
# Drop tables from the article
keep_tables = False,
##
# Whether to preserve links in output
keepLinks = False,
@ -162,7 +163,7 @@ options = SimpleNamespace(
##
# Whether to write json instead of the xml-like default output format
write_json = False,
##
# Whether to expand templates
expand_templates = True,
@ -178,18 +179,18 @@ options = SimpleNamespace(
##
# Minimum expanded text length required to print document
min_text_length = 0,
# Shared objects holding templates, redirects and cache
templates = {},
redirects = {},
# cache of parser templates
# FIXME: sharing this with a Manager slows down.
templateCache = {},
# Elements to ignore/discard
ignored_tag_patterns = [],
discardElements = [
'gallery', 'timeline', 'noinclude', 'pre',
'table', 'tr', 'td', 'th', 'caption', 'div',
@ -582,7 +583,7 @@ class Extractor(object):
:param out: a memory file.
"""
logging.info('%s\t%s', self.id, self.title)
# Separate header from text with a newline.
if options.toHTML:
title_str = '<h1>' + self.title + '</h1>'
@ -630,12 +631,12 @@ class Extractor(object):
text = self.wiki2text(text)
text = compact(self.clean(text))
text = [title_str] + text
if sum(len(line) for line in text) < options.min_text_length:
return
self.write_output(out, text)
errs = (self.template_title_errs,
self.recursion_exceeded_1_errs,
self.recursion_exceeded_2_errs,
@ -2826,6 +2827,12 @@ def pages_from(input):
title = None
page = []
def try_put_until(event, q, value):
while not event.is_set():
try:
return q.put(value, False, 1)
except queue.Full:
pass
def process_dump(input_file, template_file, out_file, file_size, file_compress,
process_count):
@ -2916,10 +2923,12 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
max_spool_length = 10000
spool_length = Value('i', 0, lock=False)
broken_pipe_event = Event()
# reduce job that sorts and prints output
reduce = Process(target=reduce_process,
args=(options, output_queue, spool_length,
out_file, file_size, file_compress))
args=(options, output_queue, spool_length, out_file,
file_size, file_compress, broken_pipe_event))
reduce.start()
# initialize jobs queue
@ -2930,43 +2939,48 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
workers = []
for i in range(worker_count):
extractor = Process(target=extract_process,
args=(options, i, jobs_queue, output_queue))
args=(options, i, jobs_queue, output_queue, broken_pipe_event))
extractor.daemon = True # only live while parent process lives
extractor.start()
workers.append(extractor)
# Mapper process
page_num = 0
for page_data in pages_from(input):
id, revid, title, ns, page = page_data
if keepPage(ns, page):
# slow down
delay = 0
if spool_length.value > max_spool_length:
# reduce to 10%
while spool_length.value > max_spool_length/10:
time.sleep(10)
delay += 10
if delay:
logging.info('Delay %ds', delay)
job = (id, revid, title, page, page_num)
jobs_queue.put(job) # goes to any available extract_process
page_num += 1
page = None # free memory
try:
page_num = 0
for page_data in pages_from(input):
if broken_pipe_event.is_set():
break
id, revid, title, ns, page = page_data
if keepPage(ns, page):
# slow down
delay = 0
if spool_length.value > max_spool_length:
# reduce to 10%
while spool_length.value > max_spool_length/10:
time.sleep(10)
delay += 10
if delay:
logging.info('Delay %ds', delay)
job = (id, revid, title, page, page_num)
# TODO if pipe is closed
try_put_until(broken_pipe_event, jobs_queue, job) # goes to any available extract_process
page_num += 1
page = None # free memory
input.close()
input.close()
except KeyboardInterrupt:
logging.warn("Exiting due interrupt")
# signal termination
for _ in workers:
jobs_queue.put(None)
# wait for workers to terminate
for w in workers:
w.join()
# signal end of work to reduce process
output_queue.put(None)
# wait for it to finish
reduce.join()
if reduce.is_alive():
# signal end of work to reduce proces
output_queue.put(None)
# wait for reduce process to finish
reduce.join()
extract_duration = default_timer() - extract_start
extract_rate = page_num / extract_duration
@ -2978,7 +2992,7 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
# Multiprocess support
def extract_process(opts, i, jobs_queue, output_queue):
def extract_process(opts, i, jobs_queue, output_queue, broken_pipe_event):
"""Pull tuples of raw page content, do CPU/regex-heavy fixup, push finished text
:param i: process id.
:param jobs_queue: where to get jobs.
@ -2991,33 +3005,40 @@ def extract_process(opts, i, jobs_queue, output_queue):
createLogger(options.quiet, options.debug)
out = StringIO() # memory buffer
while True:
job = jobs_queue.get() # job is (id, title, page, page_num)
if job:
id, revid, title, page, page_num = job
try:
e = Extractor(*job[:4]) # (id, revid, title, page)
page = None # free memory
e.extract(out)
text = out.getvalue()
except:
text = ''
logging.exception('Processing page: %s %s', id, title)
output_queue.put((page_num, text))
out.truncate(0)
out.seek(0)
else:
logging.debug('Quit extractor')
break
try:
while not broken_pipe_event.is_set():
job = jobs_queue.get() # job is (id, title, page, page_num)
if job:
id, revid, title, page, page_num = job
try:
e = Extractor(*job[:4]) # (id, revid, title, page)
page = None # free memory
e.extract(out)
text = out.getvalue()
except Exception:
text = ''
logging.exception('Processing page: %s %s', id, title)
try_put_until(broken_pipe_event, output_queue, (page_num, text))
out.truncate(0)
out.seek(0)
else:
logging.debug('Quit extractor')
break
except KeyboardInterrupt:
logging.info('Aborting worker %d', i)
output_queue.cancel_join_thread()
jobs_queue.cancel_join_thread()
if broken_pipe_event.is_set():
output_queue.cancel_join_thread()
out.close()
report_period = 10000 # progress report period
def reduce_process(opts, output_queue, spool_length,
out_file=None, file_size=0, file_compress=True):
def reduce_process(opts, output_queue, spool_length, out_file,
file_size, file_compress, broken_pipe_event):
"""Pull finished article text, write series of files (or stdout)
:param opts: global parameters.
:param output_queue: text to be output.
@ -3029,9 +3050,9 @@ def reduce_process(opts, output_queue, spool_length,
global options
options = opts
createLogger(options.quiet, options.debug)
if out_file:
nextFile = NextFile(out_file)
output = OutputSplitter(nextFile, file_size, file_compress)
@ -3044,33 +3065,44 @@ def reduce_process(opts, output_queue, spool_length,
# FIXME: use a heap
spool = {} # collected pages
next_page = 0 # sequence numbering of page
while True:
if next_page in spool:
output.write(spool.pop(next_page).encode('utf-8'))
next_page += 1
# tell mapper our load:
spool_length.value = len(spool)
# progress report
if next_page % report_period == 0:
interval_rate = report_period / (default_timer() - interval_start)
logging.info("Extracted %d articles (%.1f art/s)",
next_page, interval_rate)
interval_start = default_timer()
else:
# mapper puts None to signal finish
pair = output_queue.get()
if not pair:
break
page_num, text = pair
spool[page_num] = text
# tell mapper our load:
spool_length.value = len(spool)
# FIXME: if an extractor dies, process stalls; the other processes
# continue to produce pairs, filling up memory.
if len(spool) > 200:
logging.debug('Collected %d, waiting: %d, %d', len(spool),
next_page, next_page == page_num)
if output != sys.stdout:
try:
while True:
if next_page in spool:
try:
output.write(spool.pop(next_page).encode('utf-8'))
except BrokenPipeError:
# other side of pipe (like `head` or `grep`) is closed
# we can simply exit
broken_pipe_event.set()
break
next_page += 1
# tell mapper our load:
spool_length.value = len(spool)
# progress report
if next_page % report_period == 0:
interval_rate = report_period / (default_timer() - interval_start)
logging.info("Extracted %d articles (%.1f art/s)",
next_page, interval_rate)
interval_start = default_timer()
else:
# mapper puts None to signal finish
pair = output_queue.get()
if not pair:
break
page_num, text = pair
spool[page_num] = text
# tell mapper our load:
spool_length.value = len(spool)
# FIXME: if an extractor dies, process stalls; the other processes
# continue to produce pairs, filling up memory.
if len(spool) > 200:
logging.debug('Collected %d, waiting: %d, %d', len(spool),
next_page, next_page == page_num)
except KeyboardInterrupt:
pass
if output != sys.stdout and not broken_pipe_event.is_set():
output.close()
@ -3191,7 +3223,7 @@ def main():
options.quiet = args.quiet
options.debug = args.debug
createLogger(options.quiet, options.debug)
input_file = args.input