See ChangeLog.

This commit is contained in:
attardi 2016-02-20 10:45:58 +01:00
parent ca2a34ccce
commit 730cfc07f9
2 changed files with 34 additions and 15 deletions

View File

@ -1,3 +1,13 @@
2016-02-20 Giuseppe Attardi <attardi@di.unipi.it>
* WikiExtractor.py (reduce_process): tell mapper to wait when
spool gets too long.
2016-02-19 Giuseppe Attardi <attardi@di.unipi.it>
* WikiExtractor.py (extract_process): use out.truncate() instread
of out.close()
2016-02-15 Giuseppe Attardi <attardi@di.unipi.it>
* WikiExtractor.py (Extractor.clean): turned into method.

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# =============================================================================
# Version: 2.50 (February 15, 2016)
# Version: 2.51 (February 20, 2016)
# Author: Giuseppe Attardi (attardi@di.unipi.it), University of Pisa
#
# Contributors:
@ -60,13 +60,13 @@ import urllib
from cStringIO import StringIO
from htmlentitydefs import name2codepoint
from itertools import izip, izip_longest
from multiprocessing import Queue, Process, Array, Value, cpu_count
from multiprocessing import Queue, Process, Value, cpu_count
from timeit import default_timer
# ===========================================================================
# Program version
version = '2.50'
version = '2.51'
## PARAMS ####################################################################
@ -2374,7 +2374,7 @@ def pages_from(input):
if tag == 'page':
page = []
redirect = False
elif tag == 'id' and not id:
elif tag == 'id' and not id: # skip nested <id>
id = m.group(3)
elif tag == 'title':
title = m.group(3)
@ -2484,7 +2484,7 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
# load balancing
max_spool_length = 10000
spool_length = Value('i', 0)
spool_length = Value('i', 0, lock=False)
# reduce job that sorts and prints output
reduce = Process(target=reduce_process,
@ -2500,7 +2500,7 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
workers = []
for i in xrange(worker_count):
extractor = Process(target=extract_process,
args=(jobs_queue, output_queue))
args=(i, jobs_queue, output_queue))
extractor.daemon = True # only live while parent process lives
extractor.start()
workers.append(extractor)
@ -2511,12 +2511,16 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
id, title, ns, page = page_data
if ns not in templateKeys:
# slow down
delay = 0
while spool_length.value > max_spool_length:
time.sleep(10)
delay += 10
if delay:
logging.info('Delay &ds', delay)
job = (id, title, page, page_num)
# logging.info('Put: %s %s', id, page_num) # DEBUG
jobs_queue.put(job) # goes to any available extract_process
page_num += 1
page = None # free memory
input.close()
@ -2542,28 +2546,31 @@ def process_dump(input_file, template_file, out_file, file_size, file_compress,
# Multiprocess support
def extract_process(jobs_queue, output_queue):
def extract_process(i, jobs_queue, output_queue):
"""Pull tuples of raw page content, do CPU/regex-heavy fixup, push finished text
:param i: process id.
:param jobs_queue: where to get jobs.
:param output_queue: where to queue extracted text for output.
"""
out = StringIO() # memory buffer
while True:
job = jobs_queue.get() # job is (id, title, page, page_num)
if job:
id, title, page, page_num = job
# logging.info('Got: %s %s', id, page_num) # DEBUG
out = StringIO() # memory buffer
try:
Extractor(*job[:3]).extract(out) # (id, title, page)
e = Extractor(*job[:3]) # (id, title, page)
page = None # free memory
e.extract(out)
text = out.getvalue()
except:
text = ''
logging.error('Processing page: %s %s', id, title)
# logging.info('Done: %s %s', id, page_num) # DEBUG
output_queue.put((page_num, text))
out.close()
out.truncate(0)
else:
logging.debug('Quit extractor')
break
out.close()
report_period = 10000 # progress report period
@ -2593,6 +2600,8 @@ def reduce_process(output_queue, spool_length,
if next_page in spool:
output.write(spool.pop(next_page))
next_page += 1
# tell mapper our load:
spool_length.value = len(spool)
# progress report
if next_page % report_period == 0:
interval_rate = report_period / (default_timer() - interval_start)
@ -2610,8 +2619,8 @@ def reduce_process(output_queue, spool_length,
spool_length.value = len(spool)
# FIXME: if an extractor dies, process stalls; the other processes
# continue to produce pairs, filling up memory.
if len(spool) > 200: # DEBUG
logging.debug('Collected %d, wait: %d, %d', len(spool),
if len(spool) > 200:
logging.debug('Collected %d, waiting: %d, %d', len(spool),
next_page, next_page == page_num)
if output != sys.stdout:
output.close()