Source code for xmm.pipeline.steps.load.valuemap

from functools import lru_cache
from operator import attrgetter, itemgetter

from xmm.models import DynamicFieldsMixin, fields, ModelImpl
from xmm.util.converters import ModelNameConverter
from xmm.util.dictobject import DictObject
from ..base import PipelineStep


[docs]class ValueMapStep(PipelineStep): """Map values in datasets of an import pipeline to correct attributes in our model."""
[docs] def __init__(self, model, attribute_map, defaults=None, **options): """ Create a new ValueMapStep instance. :param model: A `str` name of a model class in the system or a `ModelImpl` instance to be used as a field_map. May also be a dict with a direct key to field map for testing purposes. :param dict attribute_map: A dict mapping model fields to import column names. You may specify multiple values using another dict as a value. Import columns may be dot-separated paths in the source. **Examples**: Import the ``title`` column from our source to the ``name`` property in our model both as English and German values:: {'name': {'en': 'title', 'de': 'title'}} Given this multilingual source data structure:: {'fields': { 'name': { 'de': 'Deutsch', 'en': 'English', }, } You may import each language to a destination explicityly using this ``value_map``:: {'name': {'en': 'fields.name.en', 'de': 'fields.name.de'}} Or import it dynamically, using all defined fields:: {'name': 'fields.name'} Adding multiple values to a field is also supported for multivalue attributes:: {'things': ['fields.thing1', 'fields.thing2', 'fields.thing3']} :param dict defaults: Default values if value not found with attribute_map. :param kwargs: Additional options to pass down to fields. :key null_values: List of values that map to a literal ``None``. :key true_values: List of values that map to a literal ``True``. :key false_values: List of values that map to a literal ``False``. :key datetimeformat: Parse datetime values with this format. :key dateformat: Parse date values with this format. :key timeformat: Parse time values with this format. """ if isinstance(model, str): model = ModelNameConverter.get_model_class(model) self.model = model self.null_values = options.pop('null_values', []) self.options = options if attribute_map is None: raise ValueError('attribute_map is required!') self.field_map = self.create_field_map() self.mapper = self.create_mapper(attribute_map, defaults or {})
@lru_cache() def get_options_for_column(self, column): options_for_column = {} for option, value in self.options.items(): if isinstance(value, dict): if column in value: options_for_column[option] = value[column] else: options_for_column[option] = value return options_for_column def create_field_map(self): """ Create a map of model field to Field type instance. Works on `self.model`, if self.model is a dict, the value will be used as a `field_map` verbatim. Usually works on a supplied Model class like :class:`xmm.models.User` or a dynamically generated subclass of :class:`xmm.models.ModelImpl`. Reads all fields or attributes defined in the model and creates Field instances using the options supplied to the step (e.g. `true_values` for :class:`BooleanField`). :returns: A mapping of field/attribute key to a Field instance configured with options. :rtype: dict """ if not self.model: return {} elif isinstance(self.model, dict): # literal field mapping return self.model elif issubclass(self.model, ModelImpl): model_cls = self.model._definition return { attr.key: attr.field.set_options(**self.get_options_for_column(attr.key)) for attr in model_cls.attributes } elif issubclass(self.model, DynamicFieldsMixin): raise ValueError('Cannot import dynamic models') return { field_name: field.set_options(**self.get_options_for_column(field_name)) for field_name, field in self.model._fields.items() if field_name not in ['id', '_cls'] } def handle_default(self, getter, default): """Return a getter function that returns a default value when the key is not found.""" def _handle_default_getter(obj): try: return getter(obj) except (AttributeError, KeyError): return default return _handle_default_getter def handle_list(self, getter): getters = [ self.get_getter(key) for key in getter ] def _handle_list(obj): return [ get(obj) for get in getters ] return _handle_list def get_getter(self, data): if isinstance(data, dict): return { key: self.get_getter(data) for key, data in data.items() } elif isinstance(data, tuple): data, default = data if isinstance(data, int): return self.handle_default(itemgetter(data), default) return self.handle_default(attrgetter(data), default) elif isinstance(data, list): return self.handle_list(data) elif isinstance(data, int): return itemgetter(data) return attrgetter(data) @staticmethod def _reference_mapper(dataset, sub_map, field): """Dereference a value by its attributes.""" is_list = False if isinstance(field, fields.ListField): field = field.field is_list = True return field.document_type.from_field_map(sub_map, dataset, is_list=is_list) def create_mapper(self, attribute_map, defaults): """ Generate a function that maps all attributes in a dataset. The created function will be called from ``self.map``, supplying each dataset in the pipeline. The function is pre-generated to save some memory. """ attr_map = {} for model_field, data_field in attribute_map.items(): attr_map[model_field] = self.get_getter(data_field) def _mapper(dataset, sub_map=None, parent_attr=None, default_map=None): """ Re-structure the dataset according to the ValueMapStep's attribute_map. The ``attr_map`` variable is a dict where each key has a getter function, to acquire the source value from the dataset. This value is then further processed in ``self.convert_value`` to the appropriate type according to the ``self.field_map`` generated earlier. This function is recursive and calls itself whereever the attr_map contains another dict instead of a getter function to call, to create a nested data structure for the target documents. :param dict dataset: Source dataset in pipeline. :param dict sub_map: Attribute map for this level in the document. :param list parent_attr: List of parent attribute keys; models usually have ``['fields']`` where all attribute values are stored. :return: The newly transformed dataset. :rtype: dict """ if sub_map is None: sub_map = attr_map default_map = defaults if parent_attr is None: parent_attr = [] if default_map is None: default_map = {} mapped = {} for attr, getter in sub_map.items(): attr_path = parent_attr + [attr] if isinstance(getter, dict): field = self.field_map.get(attr) if isinstance(field, fields.ReferenceField) \ or isinstance(getattr(field, 'field', None), fields.ReferenceField): mapped[attr] = self._reference_mapper(dataset, getter, field) else: mapped[attr] = _mapper(dataset, getter, attr_path, default_map.get(attr)) else: try: value = getter(dataset) except KeyError: value = None except AttributeError: value = default_map[attr] # raises if no default found mapped[attr] = self.convert_value(attr_path, value) return mapped return _mapper def process_step(self, state, context): state = self.map(state) return state, context def map(self, state): for dataset in state: dataset = DictObject(dataset) yield self.mapper(dataset) def convert_value(self, attr_path, raw_value): """ Convert a raw value to Python. To convert complex datatypes to their python equivalent, this method uses the ``self.field_map`` mapping of attribute keys to their Field instances. This way we know how to convert e.g. "01-08-2011" to ``datetime(2011, 8, 1)`` (if the correct dateformat is configured). :param list attr_path: A path to the attribute, e.g. ``['fields',]``. :param raw_value: The raw value coming in from the pipeline dataset. :returns: The converted value, according to the field map. """ if raw_value is None or isinstance(raw_value, (str, int)) and raw_value in self.null_values: return None previous = None while attr_path: field_name = attr_path.pop() if field_name in self.field_map: field = self.field_map[field_name] if isinstance(field, fields.MultilingualField): if previous is None and isinstance(raw_value, dict): return field.to_python(raw_value) field = field.get_field() return field.to_python(raw_value) previous = field_name return raw_value