from functools import lru_cache
from operator import attrgetter, itemgetter
from xmm.models import DynamicFieldsMixin, fields, ModelImpl
from xmm.util.converters import ModelNameConverter
from xmm.util.dictobject import DictObject
from ..base import PipelineStep
[docs]class ValueMapStep(PipelineStep):
"""Map values in datasets of an import pipeline to correct attributes in our model."""
[docs] def __init__(self, model, attribute_map, defaults=None, **options):
"""
Create a new ValueMapStep instance.
:param model: A `str` name of a model class in the system or a `ModelImpl` instance
to be used as a field_map. May also be a dict with a direct key to field
map for testing purposes.
:param dict attribute_map: A dict mapping model fields to import column names.
You may specify multiple values using another dict as a value.
Import columns may be dot-separated paths in the source.
**Examples**:
Import the ``title`` column from our source to the ``name`` property
in our model both as English and German values::
{'name': {'en': 'title', 'de': 'title'}}
Given this multilingual source data structure::
{'fields': {
'name': {
'de': 'Deutsch',
'en': 'English',
},
}
You may import each language to a destination explicityly using this ``value_map``::
{'name': {'en': 'fields.name.en', 'de': 'fields.name.de'}}
Or import it dynamically, using all defined fields::
{'name': 'fields.name'}
Adding multiple values to a field is also supported for multivalue attributes::
{'things': ['fields.thing1', 'fields.thing2', 'fields.thing3']}
:param dict defaults: Default values if value not found with attribute_map.
:param kwargs: Additional options to pass down to fields.
:key null_values: List of values that map to a literal ``None``.
:key true_values: List of values that map to a literal ``True``.
:key false_values: List of values that map to a literal ``False``.
:key datetimeformat: Parse datetime values with this format.
:key dateformat: Parse date values with this format.
:key timeformat: Parse time values with this format.
"""
if isinstance(model, str):
model = ModelNameConverter.get_model_class(model)
self.model = model
self.null_values = options.pop('null_values', [])
self.options = options
if attribute_map is None:
raise ValueError('attribute_map is required!')
self.field_map = self.create_field_map()
self.mapper = self.create_mapper(attribute_map, defaults or {})
@lru_cache()
def get_options_for_column(self, column):
options_for_column = {}
for option, value in self.options.items():
if isinstance(value, dict):
if column in value:
options_for_column[option] = value[column]
else:
options_for_column[option] = value
return options_for_column
def create_field_map(self):
"""
Create a map of model field to Field type instance.
Works on `self.model`, if self.model is a dict, the value will be
used as a `field_map` verbatim.
Usually works on a supplied Model class like :class:`xmm.models.User`
or a dynamically generated subclass of :class:`xmm.models.ModelImpl`.
Reads all fields or attributes defined in the model and creates
Field instances using the options supplied to the step
(e.g. `true_values` for :class:`BooleanField`).
:returns: A mapping of field/attribute key to a Field instance configured with options.
:rtype: dict
"""
if not self.model:
return {}
elif isinstance(self.model, dict):
# literal field mapping
return self.model
elif issubclass(self.model, ModelImpl):
model_cls = self.model._definition
return {
attr.key: attr.field.set_options(**self.get_options_for_column(attr.key))
for attr in model_cls.attributes
}
elif issubclass(self.model, DynamicFieldsMixin):
raise ValueError('Cannot import dynamic models')
return {
field_name: field.set_options(**self.get_options_for_column(field_name))
for field_name, field in self.model._fields.items()
if field_name not in ['id', '_cls']
}
def handle_default(self, getter, default):
"""Return a getter function that returns a default value when the key is not found."""
def _handle_default_getter(obj):
try:
return getter(obj)
except (AttributeError, KeyError):
return default
return _handle_default_getter
def handle_list(self, getter):
getters = [
self.get_getter(key)
for key in getter
]
def _handle_list(obj):
return [
get(obj)
for get in getters
]
return _handle_list
def get_getter(self, data):
if isinstance(data, dict):
return {
key: self.get_getter(data)
for key, data in data.items()
}
elif isinstance(data, tuple):
data, default = data
if isinstance(data, int):
return self.handle_default(itemgetter(data), default)
return self.handle_default(attrgetter(data), default)
elif isinstance(data, list):
return self.handle_list(data)
elif isinstance(data, int):
return itemgetter(data)
return attrgetter(data)
@staticmethod
def _reference_mapper(dataset, sub_map, field):
"""Dereference a value by its attributes."""
is_list = False
if isinstance(field, fields.ListField):
field = field.field
is_list = True
return field.document_type.from_field_map(sub_map, dataset, is_list=is_list)
def create_mapper(self, attribute_map, defaults):
"""
Generate a function that maps all attributes in a dataset.
The created function will be called from ``self.map``, supplying
each dataset in the pipeline. The function is pre-generated to save
some memory.
"""
attr_map = {}
for model_field, data_field in attribute_map.items():
attr_map[model_field] = self.get_getter(data_field)
def _mapper(dataset, sub_map=None, parent_attr=None, default_map=None):
"""
Re-structure the dataset according to the ValueMapStep's attribute_map.
The ``attr_map`` variable is a dict where each key has a getter function,
to acquire the source value from the dataset. This value is then further
processed in ``self.convert_value`` to the appropriate type according to
the ``self.field_map`` generated earlier.
This function is recursive and calls itself whereever the attr_map contains
another dict instead of a getter function to call, to create a nested data
structure for the target documents.
:param dict dataset: Source dataset in pipeline.
:param dict sub_map: Attribute map for this level in the document.
:param list parent_attr: List of parent attribute keys; models usually have ``['fields']``
where all attribute values are stored.
:return: The newly transformed dataset.
:rtype: dict
"""
if sub_map is None:
sub_map = attr_map
default_map = defaults
if parent_attr is None:
parent_attr = []
if default_map is None:
default_map = {}
mapped = {}
for attr, getter in sub_map.items():
attr_path = parent_attr + [attr]
if isinstance(getter, dict):
field = self.field_map.get(attr)
if isinstance(field, fields.ReferenceField) \
or isinstance(getattr(field, 'field', None), fields.ReferenceField):
mapped[attr] = self._reference_mapper(dataset, getter, field)
else:
mapped[attr] = _mapper(dataset, getter, attr_path, default_map.get(attr))
else:
try:
value = getter(dataset)
except KeyError:
value = None
except AttributeError:
value = default_map[attr] # raises if no default found
mapped[attr] = self.convert_value(attr_path, value)
return mapped
return _mapper
def process_step(self, state, context):
state = self.map(state)
return state, context
def map(self, state):
for dataset in state:
dataset = DictObject(dataset)
yield self.mapper(dataset)
def convert_value(self, attr_path, raw_value):
"""
Convert a raw value to Python.
To convert complex datatypes to their python equivalent, this method
uses the ``self.field_map`` mapping of attribute keys to their Field instances.
This way we know how to convert e.g. "01-08-2011" to ``datetime(2011, 8, 1)``
(if the correct dateformat is configured).
:param list attr_path: A path to the attribute, e.g. ``['fields',]``.
:param raw_value: The raw value coming in from the pipeline dataset.
:returns: The converted value, according to the field map.
"""
if raw_value is None or isinstance(raw_value, (str, int)) and raw_value in self.null_values:
return None
previous = None
while attr_path:
field_name = attr_path.pop()
if field_name in self.field_map:
field = self.field_map[field_name]
if isinstance(field, fields.MultilingualField):
if previous is None and isinstance(raw_value, dict):
return field.to_python(raw_value)
field = field.get_field()
return field.to_python(raw_value)
previous = field_name
return raw_value