messy 1st approach

This commit is contained in:
Gordon Mohr 2015-06-17 18:49:26 -07:00
parent 694cd5a7f4
commit 5d32701400

View File

@ -406,8 +406,8 @@ class Extractor(object):
self.magicWords['currenttime'] = time.strftime('%H:%M:%S') self.magicWords['currenttime'] = time.strftime('%H:%M:%S')
text = clean(self, text) text = clean(self, text)
footer = "\n</doc>\n" footer = "\n</doc>\n"
if out != sys.stdout: ### if out != sys.stdout:
out.reserve(len(header) + len(text) + len(footer)) ### out.reserve(len(header) + len(text) + len(footer))
out.write(header) out.write(header)
for line in compact(text): for line in compact(text):
out.write(line.encode('utf-8')) out.write(line.encode('utf-8'))
@ -1982,8 +1982,8 @@ def compact(text):
if not line: if not line:
continue continue
# Handle section titles # Handle section titles
m = section.match(line) ### m = section.match(line)
if m: if False: ### m:
title = m.group(2) title = m.group(2)
lev = len(m.group(1)) lev = len(m.group(1))
if Extractor.toHTML: if Extractor.toHTML:
@ -2198,7 +2198,7 @@ def load_templates(file, output_file=None):
if articles % 10000 == 0: if articles % 10000 == 0:
logging.info("Preprocessed %d pages", articles) logging.info("Preprocessed %d pages", articles)
def process_dump(input_file, template_file, outdir, file_size, file_compress, threads): def process_dump(input, template_file, outdir, file_size, file_compress, threads):
""" """
:param input_file: name of the wikipedia dump file. :param input_file: name of the wikipedia dump file.
:param template_file: optional file with template definitions. :param template_file: optional file with template definitions.
@ -2211,13 +2211,6 @@ def process_dump(input_file, template_file, outdir, file_size, file_compress, th
global templateNamespace global templateNamespace
global expand_templates global expand_templates
if input_file.lower().endswith("bz2"):
opener = bz2.BZ2File
else:
opener = open
input = opener(input_file)
# collect siteinfo # collect siteinfo
for line in input: for line in input:
line = line.decode('utf-8') line = line.decode('utf-8')
@ -2250,21 +2243,27 @@ def process_dump(input_file, template_file, outdir, file_size, file_compress, th
input = opener(input_file) input = opener(input_file)
# process pages # process pages
logging.info("Starting processing pages from %s.", input_file) ### logging.info("Starting processing pages from %s.", input_file)
logging.info("Starting processing pages from %s.", 'input')
# initialize jobs queue # initialize jobs queue
#threads = multiprocessing.cpu_count() #threads = multiprocessing.cpu_count()
logging.info("Using %d CPUs.", threads) logging.info("Using %d CPUs.", threads)
queue = Queue.Queue(maxsize=2 * threads) ### queue = Queue.Queue(maxsize=2 * threads)
queue = multiprocessing.JoinableQueue(maxsize=10 * threads)
lock = threading.Lock() # for protecting shared state. lock = threading.Lock() # for protecting shared state.
nextFile = NextFile(lock, outdir) ### nextFile = NextFile(lock, outdir)
# start worker threads # start worker threads
workers = [] workers = []
for _ in xrange(max(1, threads - 1)): # keep one for master for _ in xrange(max(1, threads - 1)): # keep one for master
output_splitter = OutputSplitter(nextFile, file_size, file_compress) ### output_splitter = OutputSplitter(nextFile, file_size, file_compress)
extractor = ExtractorThread(queue, output_splitter) ### extractor = ExtractorThread(queue, output_splitter)
fname = outdir +'/'+ str(_)
extractor = multiprocessing.Process(target=worker_process,args=(queue,fname))
extractor.daemon = False # ensure worker process gets to finish
extractor.start()
workers.append(extractor) workers.append(extractor)
# we collect indivual lines, since str.join() is significantly faster than # we collect indivual lines, since str.join() is significantly faster than
@ -2308,28 +2307,47 @@ def process_dump(input_file, template_file, outdir, file_size, file_compress, th
colon = title.find(':') colon = title.find(':')
if (colon < 0 or title[:colon] in acceptedNamespaces) and \ if (colon < 0 or title[:colon] in acceptedNamespaces) and \
not redirect and not title.startswith(templateNamespace): not redirect and not title.startswith(templateNamespace):
queue.put(Extractor(id, title, page), True) # block if full ### queue.put(Extractor(id, title, page), True) # block if full
item = (id, title, page)
### print(id)
queue.put(item, True) # block if full
id = None id = None
page = [] page = []
for _ in xrange(max(1, threads - 1)):
queue.put(None) # let each thread finish
# wait for empty queue # wait for empty queue
queue.join() queue.join()
input.close()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
# Multithread version # Multithread version
class ExtractorThread(threading.Thread): def worker_process(queue, fname):
output = bz2.BZ2File(fname + '.bz2', 'w')
while True:
job = queue.get()
if job:
Extractor(*job).extract(output)
queue.task_done() # notify of previous job done
else:
break
output.close()
queue.task_done() # notify of final job done only after file close
###class ExtractorThread(threading.Thread):
class ExtractorThread(multiprocessing.Process):
""" """
Extractor thread. Extractor thread.
""" """
def __init__(self, queue, splitter): def __init__(self, queue, splitter):
self._queue = queue self._queue = queue
self._splitter = splitter self._splitter = splitter
threading.Thread.__init__(self) #### threading.Thread.__init__(self)
self.setDaemon(True) # let the process die when main thread is killed multiprocessing.Process.__init__(self)
self.daemon = True # let the process die when main thread is killed
self.start() self.start()
def run(self): def run(self):
@ -2450,8 +2468,17 @@ def main():
logging.error('Could not create: %s', output_dir) logging.error('Could not create: %s', output_dir)
return return
process_dump(input_file, args.templates, output_dir, file_size, if input_file == '-':
input = sys.stdin
elif input_file.lower().endswith("bz2"):
input = bz2.BZ2File(input_file)
else:
input = open(input_file)
process_dump(input, args.templates, output_dir, file_size,
args.compress, args.threads) args.compress, args.threads)
input.close()
if __name__ == '__main__': if __name__ == '__main__':
main() main()