Source code for xmm.pipeline.steps.export.format

import logging
from collections import OrderedDict
from itertools import count

from mongoengine import ObjectIdField

from xmm.models import fields
from ..base import PipelineStep


logger = logging.getLogger(__name__)


[docs]class FormatStep(PipelineStep): """ Format data in a flat or nested hierarchical structure. TODO: Infinite recursion detection and prevention: To prevent exporting recursive documents, there must be a filter present for the attribute that points to the same class of documents. Example: Exporting `page` with attribute `parent` must set a filter in the likes of `parent=None` (which only gets applied to the root documents being exported). """ TARGETS = { 'text': 'to_str', 'raw': 'to_json', } DEFAULT_TARGET = 'text'
[docs] def __init__(self, target=None, depth=None, collapse=False, value_map=None, **options): """ Configure data conversion for this step. :param str target: Currently supported is `'text'` and `'json'` Defaults to text, which will convert everything to `str` before writing. :param int depth: Depth zero will yield a flat list of values, otherwise will export selected attributes up to the specified depth. Set depth to non-positive value for no restriction! :param bool collapse: If True, empty nested trees will just collapse to a single NULL. WARNING: This will potentially remove keys from your output! Do not use if you rely on all columns defined to be present (albeit NULL). :param dict value_map: Direct value mapping, e.g. None to 'NULL'. :param dict options: Additional options that will be passed to datatype converters. """ if target is None or target not in self.TARGETS: target = self.DEFAULT_TARGET self.target = target self.format_func = self.TARGETS[self.target] if depth is None: depth = 0 self.depth = depth self.collapse = collapse if value_map is None: value_map = {} if isinstance(value_map, list): # try to convert from a list of pairs # JSON only supports strings as keys so we should use this format instead there. value_map = dict(value_map) self.value_map = value_map self.options = options
def process_step(self, state, context): """Return a flat list of values.""" context['format'] = { 'depth': self.depth, 'flat': self.depth == 0, } return map(self._format_document, state, count(1)), context def _format_document(self, document, i=0): """Convert the document to the desired depth with proper values for exporting.""" if self.depth == 0: return [self._format_value(column) for column in document] formatted_doc = OrderedDict() def _set_deep(doc, keys, value): """Set a key in a nested dict.""" for key in keys[:-1]: doc = doc.setdefault(key, OrderedDict()) doc[keys[-1]] = value for column in document: column_keys = column['name'].split('.', maxsplit=self.depth - 1) value = self._format_value(column) if value is None and self.collapse and not formatted_doc.get(column_keys[0]): formatted_doc[column_keys[0]] = None else: _set_deep(formatted_doc, column_keys, value) return formatted_doc def _format_value(self, column): value = self._get_value_callback(column) try: # try getting the mapped value, continuing if we dont have it mapped. return self._mapped_value(value) except ValueError: # we raise an exception instead of returning None, because # `None` could potentially the actual mapped value! pass if 'field' not in column or column['field'] is None: return value field = column['field'] if hasattr(field, 'set_options'): field.set_options(**self.options) if isinstance(field, (fields.ListField, fields.MultilingualField)): field = field.get_field() if isinstance(column['field'], ObjectIdField): # we cannot override automatically added `id` fields. if value is None: return '' ret = str(column['field'].to_python(value)) if ret is None: return None return str(ret) if hasattr(field, self.format_func): if isinstance(value, list): value = list(map(getattr(field, self.format_func), filter(lambda v: v is not None, value))) if self.target == 'text': value = self.options.get('value_separator', ',').join(value) return value return getattr(field, self.format_func)(value) return value def _get_value_callback(self, column): """Get the value from a column, potentially using it as a callback.""" value = column.get('value') if column.get('field') is None and callable(value): if hasattr(value, 'export_recursive'): return map(self._format_document, value()) return value() return value def _mapped_value(self, value): """Check if the value is present the value_map.""" if self.target == 'text' and not isinstance(value, (list, dict)): if any(map(lambda v: v == value and type(v) == type(value), self.value_map)): # We need to test for equality like this because # l = [True, False, 4, 5] # assert 1 in l # is True in Python :( return self.value_map[value] raise ValueError('Value not present in value_map.')