Source code for xmm.pipeline.steps.export.format
import logging
from collections import OrderedDict
from itertools import count
from mongoengine import ObjectIdField
from xmm.models import fields
from ..base import PipelineStep
logger = logging.getLogger(__name__)
[docs]class FormatStep(PipelineStep):
"""
Format data in a flat or nested hierarchical structure.
TODO: Infinite recursion detection and prevention:
To prevent exporting recursive documents, there
must be a filter present for the attribute that points
to the same class of documents.
Example: Exporting `page` with attribute `parent` must
set a filter in the likes of `parent=None` (which only
gets applied to the root documents being exported).
"""
TARGETS = {
'text': 'to_str',
'raw': 'to_json',
}
DEFAULT_TARGET = 'text'
[docs] def __init__(self, target=None, depth=None, collapse=False, value_map=None, **options):
"""
Configure data conversion for this step.
:param str target: Currently supported is `'text'` and `'json'`
Defaults to text, which will convert everything to `str` before writing.
:param int depth: Depth zero will yield a flat list of values,
otherwise will export selected attributes up to the specified depth.
Set depth to non-positive value for no restriction!
:param bool collapse: If True, empty nested trees will just collapse to a single NULL.
WARNING: This will potentially remove keys from your output! Do not use
if you rely on all columns defined to be present (albeit NULL).
:param dict value_map: Direct value mapping, e.g. None to 'NULL'.
:param dict options: Additional options that will be passed to datatype converters.
"""
if target is None or target not in self.TARGETS:
target = self.DEFAULT_TARGET
self.target = target
self.format_func = self.TARGETS[self.target]
if depth is None:
depth = 0
self.depth = depth
self.collapse = collapse
if value_map is None:
value_map = {}
if isinstance(value_map, list):
# try to convert from a list of pairs
# JSON only supports strings as keys so we should use this format instead there.
value_map = dict(value_map)
self.value_map = value_map
self.options = options
def process_step(self, state, context):
"""Return a flat list of values."""
context['format'] = {
'depth': self.depth,
'flat': self.depth == 0,
}
return map(self._format_document, state, count(1)), context
def _format_document(self, document, i=0):
"""Convert the document to the desired depth with proper values for exporting."""
if self.depth == 0:
return [self._format_value(column) for column in document]
formatted_doc = OrderedDict()
def _set_deep(doc, keys, value):
"""Set a key in a nested dict."""
for key in keys[:-1]:
doc = doc.setdefault(key, OrderedDict())
doc[keys[-1]] = value
for column in document:
column_keys = column['name'].split('.', maxsplit=self.depth - 1)
value = self._format_value(column)
if value is None and self.collapse and not formatted_doc.get(column_keys[0]):
formatted_doc[column_keys[0]] = None
else:
_set_deep(formatted_doc, column_keys, value)
return formatted_doc
def _format_value(self, column):
value = self._get_value_callback(column)
try:
# try getting the mapped value, continuing if we dont have it mapped.
return self._mapped_value(value)
except ValueError:
# we raise an exception instead of returning None, because
# `None` could potentially the actual mapped value!
pass
if 'field' not in column or column['field'] is None:
return value
field = column['field']
if hasattr(field, 'set_options'):
field.set_options(**self.options)
if isinstance(field, (fields.ListField, fields.MultilingualField)):
field = field.get_field()
if isinstance(column['field'], ObjectIdField):
# we cannot override automatically added `id` fields.
if value is None:
return ''
ret = str(column['field'].to_python(value))
if ret is None:
return None
return str(ret)
if hasattr(field, self.format_func):
if isinstance(value, list):
value = list(map(getattr(field, self.format_func), filter(lambda v: v is not None, value)))
if self.target == 'text':
value = self.options.get('value_separator', ',').join(value)
return value
return getattr(field, self.format_func)(value)
return value
def _get_value_callback(self, column):
"""Get the value from a column, potentially using it as a callback."""
value = column.get('value')
if column.get('field') is None and callable(value):
if hasattr(value, 'export_recursive'):
return map(self._format_document, value())
return value()
return value
def _mapped_value(self, value):
"""Check if the value is present the value_map."""
if self.target == 'text' and not isinstance(value, (list, dict)):
if any(map(lambda v: v == value and type(v) == type(value), self.value_map)):
# We need to test for equality like this because
# l = [True, False, 4, 5]
# assert 1 in l
# is True in Python :(
return self.value_map[value]
raise ValueError('Value not present in value_map.')