Source code for xmm.pipeline.steps.expand

from .base import PipelineStep


[docs]class ExpandStep(PipelineStep): """ Convert flattened dicts into nested dicts. :param int depth: Number of levels to expand. Set to -1 for infinite depth. **Examples**: Input:: { 'fields.name.de': 'Deutsch', 'fields.name.en': 'English', } Output:: { 'fields': { 'name': { 'de': 'Deutsch', 'en': 'English', }, }, } """ def __init__(self, depth=-1): self.depth = depth def expand(self, dct, level=0): """Expand a dict.""" cls = dct.__class__ def _set_deep(doc, keys, value): """Set a key in a nested dict.""" for key in keys[:-1]: doc = doc.setdefault(key, cls()) doc[keys[-1]] = value result = cls() for key, value in dct.items(): _set_deep(result, key.split('.', maxsplit=self.depth), value) return result def map(self, state): for dataset in state: yield self.expand(dataset) def process_step(self, state, context): state = self.map(state) return state, context