Source code for xmm.pipeline.steps.load.delete

from collections import Counter
from operator import attrgetter

from mongoengine.errors import OperationError

from xmm.util.converters import ModelNameConverter
from xmm.util.dictobject import DictObject
from ..base import PipelineStep


[docs]class DbDeleteStep(PipelineStep): """Delete instances from the database."""
[docs] def __init__(self, model, primary_keys=None): """ Create a new DB deleter step. :param str|type model: Model class name or concrete class. :param str|list primary_keys: Attribute(s) to use for querying the documents to delete. """ if isinstance(model, str): self.model = ModelNameConverter.get_model_class(model) else: self.model = model assert primary_keys is not None, 'primary_keys must be defined' if not isinstance(primary_keys, (list, tuple)): primary_keys = [primary_keys] self.primary_keys = primary_keys self.counter = Counter(delete=0, error=0)
def map(self, state): for value_map in state: value_map = DictObject(value_map) q = { pk.replace('.', '__'): attrgetter(pk)(value_map) for pk in self.primary_keys } try: count = self.model.objects(**q).delete() except OperationError: self.counter['error'] += 1 else: self.counter['delete'] += count yield value_map def process_step(self, state, context): context['delete'] = { 'model': self.model, } state = self.map(state) return state, context def cleanup(self, context, exception=None): context['counter'] = self.counter return context