wikiextractor/WikiExtractor.py
2020-07-22 15:03:20 +02:00

610 lines
21 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# =============================================================================
# Version: 3.0 (July 22, 2020)
# Author: Giuseppe Attardi (attardi@di.unipi.it), University of Pisa
#
# Contributors:
# Antonio Fuschetto (fuschett@aol.com)
# Leonardo Souza (lsouza@amtera.com.br)
# Juan Manuel Caicedo (juan@cavorite.com)
# Humberto Pereira (begini@gmail.com)
# Siegfried-A. Gevatter (siegfried@gevatter.com)
# Pedro Assis (pedroh2306@gmail.com)
# Wim Muskee (wimmuskee@gmail.com)
# Radics Geza (radicsge@gmail.com)
# Nick Ulven (nulven@github)
#
# =============================================================================
# Copyright (c) 2009-2020. Giuseppe Attardi (attardi@di.unipi.it).
# =============================================================================
# This file is part of Tanl.
#
# Tanl is free software; you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License, version 3,
# as published by the Free Software Foundation.
#
# Tanl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# =============================================================================
"""Wikipedia Extractor:
Extracts and cleans text from a Wikipedia database dump and stores output in a
number of files of similar size in a given directory.
Each file will contain several documents in the format:
<doc id="" url="" title="">
...
</doc>
This version performs template expansion by preprocesssng the whole dump and
collecting template definitions.
"""
import argparse
import bz2
import codecs
import fileinput
import logging
import os.path
import re # TODO use regex when it will be standard
import sys
from io import StringIO
from multiprocessing import Queue, Process, cpu_count
from timeit import default_timer
from wikiextractor.extract import Extractor, ignoreTag
# ===========================================================================
# Program version
version = '3.0'
##
# Defined in <siteinfo>
# We include as default Template, when loading external template file.
knownNamespaces = set(['Template'])
##
# The namespace used for template definitions
# It is the name associated with namespace key=10 in the siteinfo header.
templateNamespace = ''
templatePrefix = ''
##
# The namespace used for module definitions
# It is the name associated with namespace key=828 in the siteinfo header.
moduleNamespace = ''
# This is obtained from <siteinfo>
urlbase = ''
# ----------------------------------------------------------------------
# Modules
# Only minimal support
# FIXME: import Lua modules.
modules = {
'convert': {
'convert': lambda x, u, *rest: x + ' ' + u, # no conversion
}
}
# ----------------------------------------------------------------------
# Expand using WikiMedia API
# import json
# def expandTemplates(text):
# """Expand templates invoking MediaWiki API"""
# text = urlib.urlencodew(text.encode('utf-8'))
# base = urlbase[:urlbase.rfind('/')]
# url = base + "/w/api.php?action=expandtemplates&format=json&text=" + text
# exp = json.loads(urllib.urlopen(url))
# return exp['expandtemplates']['*']
# ------------------------------------------------------------------------------
# Output
class NextFile(object):
"""
Synchronous generation of next available file name.
"""
filesPerDir = 100
def __init__(self, path_name):
self.path_name = path_name
self.dir_index = -1
self.file_index = -1
def next(self):
self.file_index = (self.file_index + 1) % NextFile.filesPerDir
if self.file_index == 0:
self.dir_index += 1
dirname = self._dirname()
if not os.path.isdir(dirname):
os.makedirs(dirname)
return self._filepath()
def _dirname(self):
char1 = self.dir_index % 26
char2 = self.dir_index / 26 % 26
return os.path.join(self.path_name, '%c%c' % (ord('A') + char2, ord('A') + char1))
def _filepath(self):
return '%s/wiki_%02d' % (self._dirname(), self.file_index)
class OutputSplitter(object):
"""
File-like object, that splits output to multiple files of a given max size.
"""
def __init__(self, nextFile, max_file_size=0, compress=True):
"""
:param nextFile: a NextFile object from which to obtain filenames
to use.
:param max_file_size: the maximum size of each file.
:para compress: whether to write data with bzip compression.
"""
self.nextFile = nextFile
self.compress = compress
self.max_file_size = max_file_size
self.file = self.open(self.nextFile.next())
def reserve(self, size):
if self.file.tell() + size > self.max_file_size:
self.close()
self.file = self.open(self.nextFile.next())
def write(self, data):
self.reserve(len(data))
self.file.write(data)
def close(self):
self.file.close()
def open(self, filename):
if self.compress:
return bz2.BZ2File(filename + '.bz2', 'w')
else:
return open(filename, 'w')
# ----------------------------------------------------------------------
# READER
tagRE = re.compile(r'(.*?)<(/?\w+)[^>]*>(?:([^<]*)(<.*?>)?)?')
# 1 2 3 4
def load_templates(file, output_file=None):
"""
Load templates from :param file:.
:param output_file: file where to save templates and modules.
"""
global templateNamespace, templatePrefix
templatePrefix = templateNamespace + ':'
global moduleNamespace, modulePrefix
modulePrefix = moduleNamespace + ':'
articles = 0
page = []
inText = False
if output_file:
output = codecs.open(output_file, 'wb', 'utf-8')
for line in file:
line = line.decode('utf-8')
if '<' not in line: # faster than doing re.search()
if inText:
page.append(line)
continue
m = tagRE.search(line)
if not m:
continue
tag = m.group(2)
if tag == 'page':
page = []
elif tag == 'title':
title = m.group(3)
elif tag == 'text':
inText = True
line = line[m.start(3):m.end(3)]
page.append(line)
if m.lastindex == 4: # open-close
inText = False
elif tag == '/text':
if m.group(1):
page.append(m.group(1))
inText = False
elif inText:
page.append(line)
elif tag == '/page':
if not output_file and not templateNamespace: # do not know it yet
# we reconstruct it from the first title
colon = title.find(':')
if colon > 1:
templateNamespace = title[:colon]
templatePrefix = title[:colon + 1]
# FIXME: should reconstruct also moduleNamespace
if title.startswith(templatePrefix):
define_template(title, page)
# save templates and modules to file
if output_file and (title.startswith(templatePrefix) or
title.startswith(modulePrefix)):
output.write('<page>\n')
output.write(' <title>%s</title>\n' % title)
output.write(' <ns>10</ns>\n')
output.write(' <text>')
for line in page:
output.write(line)
output.write(' </text>\n')
output.write('</page>\n')
page = []
articles += 1
if articles % 100000 == 0:
logging.info("Preprocessed %d pages", articles)
if output_file:
output.close()
logging.info("Saved %d templates to '%s'", len(templates), output_file)
def process_dump(input_file, template_file, out_file, file_size, file_compress,
process_count):
"""
:param input_file: name of the wikipedia dump file; '-' to read from stdin
:param template_file: optional file with template definitions.
:param out_file: directory where to store extracted data, or '-' for stdout
:param file_size: max size of each extracted file, or None for no max (one file)
:param file_compress: whether to compress files with bzip.
:param process_count: number of extraction processes to spawn.
"""
global urlbase
global knownNamespaces
global templateNamespace, templatePrefix
global moduleNamespace, modulePrefix
if input_file == '-':
input = sys.stdin
else:
input = fileinput.FileInput(input_file, openhook=fileinput.hook_compressed)
# collect siteinfo
for line in input:
line = line.decode('utf-8')
m = tagRE.search(line)
if not m:
continue
tag = m.group(2)
if tag == 'base':
# discover urlbase from the xml dump file
# /mediawiki/siteinfo/base
base = m.group(3)
urlbase = base[:base.rfind("/")]
elif tag == 'namespace':
knownNamespaces.add(m.group(3))
if re.search('key="10"', line):
templateNamespace = m.group(3)
templatePrefix = templateNamespace + ':'
elif re.search('key="828"', line):
moduleNamespace = m.group(3)
modulePrefix = moduleNamespace + ':'
elif tag == '/siteinfo':
break
if expand_templates:
# preprocess
template_load_start = default_timer()
if template_file and os.path.exists(template_file):
logging.info("Preprocessing '%s' to collect template definitions: this may take some time.", template_file)
file = fileinput.FileInput(template_file, openhook=fileinput.hook_compressed)
load_templates(file)
file.close()
else:
if input_file == '-':
# can't scan then reset stdin; must error w/ suggestion to specify template_file
raise ValueError("to use templates with stdin dump, must supply explicit template-file")
logging.info("Preprocessing '%s' to collect template definitions: this may take some time.", input_file)
load_templates(input, template_file)
input.close()
input = fileinput.FileInput(input_file, openhook=fileinput.hook_compressed)
template_load_elapsed = default_timer() - template_load_start
logging.info("Loaded %d templates in %.1fs", len(templates), template_load_elapsed)
if out_file == '-':
output = sys.stdout
if file_compress:
logging.warn("writing to stdout, so no output compression (use an external tool)")
else:
nextFile = NextFile(out_file)
output = OutputSplitter(nextFile, file_size, file_compress)
# process pages
logging.info("Starting page extraction from %s.", input_file)
extract_start = default_timer()
# Parallel Map/Reduce:
# - pages to be processed are dispatched to workers
# - a reduce process collects the results, sort them and print them.
maxsize = 10 * process_count
# output queue
output_queue = Queue(maxsize=maxsize)
# Reduce job that sorts and prints output
reduce = Process(target=reduce_process, args=(output_queue, output))
reduce.start()
# initialize jobs queue
jobs_queue = Queue(maxsize=maxsize)
# start worker processes
logging.info("Using %d extract processes.", process_count)
workers = []
for _ in xrange(max(1, process_count)):
extractor = Process(target=extract_process,
args=(jobs_queue, output_queue))
extractor.daemon = True # only live while parent process lives
extractor.start()
workers.append(extractor)
# Mapper process
# we collect individual lines, since str.join() is significantly faster
# than concatenation
page = []
id = None
last_id = None
ordinal = 0 # page count
inText = False
redirect = False
for line in input:
line = line.decode('utf-8')
if '<' not in line: # faster than doing re.search()
if inText:
page.append(line)
continue
m = tagRE.search(line)
if not m:
continue
tag = m.group(2)
if tag == 'page':
page = []
redirect = False
elif tag == 'id' and not id:
id = m.group(3)
elif tag == 'title':
title = m.group(3)
elif tag == 'redirect':
redirect = True
elif tag == 'text':
inText = True
line = line[m.start(3):m.end(3)]
page.append(line)
if m.lastindex == 4: # open-close
inText = False
elif tag == '/text':
if m.group(1):
page.append(m.group(1))
inText = False
elif inText:
page.append(line)
elif tag == '/page':
colon = title.find(':')
if (colon < 0 or title[:colon] in acceptedNamespaces) and id != last_id and \
not redirect and not title.startswith(templateNamespace):
job = (id, title, page, ordinal)
jobs_queue.put(job) # goes to any available extract_process
last_id = id
ordinal += 1
id = None
page = []
input.close()
# signal termination
for _ in workers:
jobs_queue.put(None)
# wait for workers to terminate
for w in workers:
w.join()
# signal end of work to reduce process
output_queue.put(None)
# wait for it to finish
reduce.join()
if output != sys.stdout:
output.close()
extract_duration = default_timer() - extract_start
extract_rate = ordinal / extract_duration
logging.info("Finished %d-process extraction of %d articles in %.1fs (%.1f art/s)",
process_count, ordinal, extract_duration, extract_rate)
# ----------------------------------------------------------------------
# Multiprocess support
def extract_process(jobs_queue, output_queue):
"""Pull tuples of raw page content, do CPU/regex-heavy fixup, push finished text
:param jobs_queue: where to get jobs.
:param output_queue: where to queue extracted text for output.
"""
while True:
job = jobs_queue.get() # job is (id, title, page, ordinal)
if job:
out = StringIO() # memory buffer
Extractor(*job[:3]).extract(out) # (id, title, page)
text = out.getvalue()
output_queue.put((job[3], text)) # (ordinal, extracted_text)
out.close()
else:
break
def reduce_process(output_queue, output):
"""Pull finished article text, write series of files (or stdout)
:param output_queue: text to be output.
:param output: file object where to print.
"""
interval_start = default_timer()
period = 100000
# FIXME: use a heap
ordering_buffer = {} # collected pages
next_ordinal = 0 # sequence number of pages
while True:
if next_ordinal in ordering_buffer:
output.write(ordering_buffer.pop(next_ordinal))
next_ordinal += 1
# progress report
if next_ordinal % period == 0:
interval_rate = period / (default_timer() - interval_start)
logging.info("Extracted %d articles (%.1f art/s)",
next_ordinal, interval_rate)
interval_start = default_timer()
else:
# mapper puts None to signal finish
pair = output_queue.get()
if not pair:
break
ordinal, text = pair
ordering_buffer[ordinal] = text
# ----------------------------------------------------------------------
# Minimum size of output files
minFileSize = 200 * 1024
def main():
global urlbase, acceptedNamespaces
global expand_templates, templateCache, escape_doc
parser = argparse.ArgumentParser(prog=os.path.basename(sys.argv[0]),
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__)
parser.add_argument("input",
help="XML wiki dump file")
groupO = parser.add_argument_group('Output')
groupO.add_argument("-o", "--output", default="text",
help="directory for extracted files (or '-' for dumping to stdout)")
groupO.add_argument("-b", "--bytes", default="1M",
help="maximum bytes per output file (default %(default)s)",
metavar="n[KMG]")
groupO.add_argument("-c", "--compress", action="store_true",
help="compress output files using bzip")
groupP = parser.add_argument_group('Processing')
groupP.add_argument("--html", action="store_true",
help="produce HTML output, subsumes --links")
groupP.add_argument("-l", "--links", action="store_true",
help="preserve links")
groupP.add_argument("-ns", "--namespaces", default="", metavar="ns1,ns2",
help="accepted namespaces")
groupP.add_argument("--templates",
help="use or create file containing templates")
groupP.add_argument("--no-templates", action="store_false",
help="Do not expand templates")
groupP.add_argument("--escapedoc", action="store_true",
help="use to escape the contents of the output <doc>...</doc>")
default_process_count = cpu_count() - 1
parser.add_argument("--processes", type=int, default=default_process_count,
help="Number of processes to use (default %(default)s)")
groupS = parser.add_argument_group('Special')
groupS.add_argument("-q", "--quiet", action="store_true",
help="suppress reporting progress info")
groupS.add_argument("--debug", action="store_true",
help="print debug info")
groupS.add_argument("-a", "--article", action="store_true",
help="analyze a file containing a single article (debug option)")
groupS.add_argument("-v", "--version", action="version",
version='%(prog)s ' + version,
help="print program version")
args = parser.parse_args()
Extractor.keepLinks = args.links
Extractor.toHTML = args.html
if args.html:
Extractor.keepLinks = True
expand_templates = args.no_templates
escape_doc = args.escapedoc
try:
power = 'kmg'.find(args.bytes[-1].lower()) + 1
file_size = int(args.bytes[:-1]) * 1024 ** power
if file_size < minFileSize:
raise ValueError()
except ValueError:
logging.error('Insufficient or invalid size: %s', args.bytes)
return
if args.namespaces:
acceptedNamespaces = set(args.namespaces.split(','))
FORMAT = '%(levelname)s: %(message)s'
logging.basicConfig(format=FORMAT)
logger = logging.getLogger()
if not args.quiet:
logger.setLevel(logging.INFO)
if args.debug:
logger.setLevel(logging.DEBUG)
input_file = args.input
if not Extractor.keepLinks:
ignoreTag('a')
# sharing cache of parser templates is too slow:
# manager = Manager()
# templateCache = manager.dict()
if args.article:
if args.templates:
if os.path.exists(args.templates):
with open(args.templates) as file:
load_templates(file)
with open(input_file) as file:
page = file.read().decode('utf-8')
m = re.search(r'<id>(.*)</id>', page)
id = m.group(1) if m else 0
m = re.search(r'<title>(.*)</title>', page)
if m:
title = m.group(1)
else:
logging.error('Missing title element')
return
Extractor(id, title, [page]).extract(sys.stdout)
return
output_path = args.output
if output_path != '-' and not os.path.isdir(output_path):
try:
os.makedirs(output_path)
except:
logging.error('Could not create: %s', output_path)
return
process_dump(input_file, args.templates, output_path, file_size,
args.compress, args.processes)
if __name__ == '__main__':
main()