# Copyright 2019 Kensho Technologies, LLC.
"""Module for Wikidata JSON dumps."""
import bz2
import gzip
import json
import logging
import os
import subprocess
from contextlib import contextmanager
from typing import IO, Any, Dict, Iterator, List, Optional, Tuple
[docs]class WikidataJsonDump:
"""Class for Wikidata JSON dump files.
Represents a json file from https://dumps.wikimedia.org/wikidatawiki/entities.
File names are of the form "wikidata-YYYYMMDD-all.json[.bz2|.gz]". The file is a single JSON
array and there is one element (i.e. item or property) on each line with the first and
last lines being the opening and closing square brackets. This class can handle bz2 or gz
compressed files as well as the uncompressed json files.
Parameters
----------
filename: str
The wikidata JSON dump file name (e.g. `my_data_dir/wikidata-20180730-all.json.bz2`)
"""
def __init__(self, filename: str) -> None:
if not isinstance(filename, str):
raise ValueError("filename must be a string")
if filename.endswith(".json"):
self.basename, _ = os.path.splitext(filename)
self.compression = None
elif filename.endswith(".json.bz2"):
self.basename, _ = os.path.splitext(os.path.splitext(filename)[0])
self.compression = "bz2"
elif filename.endswith(".json.gz"):
self.basename, _ = os.path.splitext(os.path.splitext(filename)[0])
self.compression = "gz"
else:
raise ValueError('filename must end with ".json.bz2" or ".json.gz" or ".json"')
self.filename = filename
self.logger = logging.getLogger(__name__)
@contextmanager
def _open_dump_file(self) -> Iterator[IO[Any]]:
"""Context manager that opens compressed/uncompressed dump files.
It is important to open the file in binary mode even if it is not compressed. This allows us
to handle decoding in one place.
"""
if self.compression == "bz2":
with bz2.open(self.filename, mode="rb") as fp:
yield fp
elif self.compression == "gz":
with gzip.open(self.filename, mode="rb") as fp:
yield fp
else:
with open(self.filename, mode="rb") as fp:
yield fp
[docs] def iter_lines(self) -> Iterator[str]:
"""Generate lines from JSON dump file."""
with self._open_dump_file() as fp:
for linebytes in fp:
yield linebytes.decode("utf-8")
def __iter__(self) -> Iterator[Dict]:
"""Iterate over lines in the file."""
for line_str in self.iter_lines():
line_str = line_str.rstrip(",\n")
# first and last lines are opening and closing brackets
if line_str in ["[", "]"]:
continue
yield json.loads(line_str)
def _write_chunk(
self, out_fbase: str, ichunk: int, out_lines: List[str]
) -> Tuple[List[str], int, str]:
"""Write a single chunk to disk."""
out_fname = "{}-ichunk_{}.json".format(out_fbase, ichunk)
self.logger.debug("writing {}".format(out_fname))
out_lines = [out_line.rstrip(",\n") for out_line in out_lines]
with open(out_fname, "w") as fp:
fp.write("[\n")
fp.write(",\n".join(out_lines))
fp.write("\n]\n")
if self.compression == "bz2":
args = ["bzip2", out_fname]
subprocess.check_output(args)
out_fname = "{}.bz2".format(out_fname)
elif self.compression == "gz":
args = ["gzip", out_fname]
subprocess.check_output(args)
out_fname = "{}.gz".format(out_fname)
out_lines = []
ichunk += 1
return out_lines, ichunk, out_fname
[docs] def create_chunks(
self,
out_fbase: Optional[str] = None,
num_lines_per_chunk: int = 100,
max_chunks: int = 10 ** 10,
) -> List[str]:
"""Produce N files with `num_lines_per_chunk` wikidata items per file.
Parameters
----------
out_fbase: str
Each output file will have the form `{out_fbase}_ichunk_{ichunk}.json[.bz2|.gz]`
num_lines_per_chunk: int
Number of lines per chunk file
max_chunks: int
Maximum number of chunks to write
"""
wd_dump = WikidataJsonDump(self.filename)
if out_fbase is None:
out_fbase = self.basename
ichunk = 0
out_lines = [] # type: List[str]
out_fnames = [] # type: List[str]
for iline, line in enumerate(wd_dump.iter_lines()):
if line.strip() in ["[", "]"]:
continue
out_lines.append(line)
if len(out_lines) >= num_lines_per_chunk:
out_lines, ichunk, out_fname = self._write_chunk(out_fbase, ichunk, out_lines)
out_fnames.append(out_fname)
if ichunk >= max_chunks:
return out_fnames
if len(out_lines) > 0:
out_lines, ichunk, out_fname = self._write_chunk(out_fbase, ichunk, out_lines)
out_fnames.append(out_fname)
return out_fnames
def __str__(self) -> str:
return "WikidataJsonDump(filename={})".format(self.filename)
def __repr__(self) -> str:
return self.__str__()