from collections import Counter
from operator import attrgetter
from mongoengine.errors import OperationError
from xmm.util.converters import ModelNameConverter
from xmm.util.dictobject import DictObject
from ..base import PipelineStep
[docs]class DbDeleteStep(PipelineStep):
"""Delete instances from the database."""
[docs] def __init__(self, model, primary_keys=None):
"""
Create a new DB deleter step.
:param str|type model: Model class name or concrete class.
:param str|list primary_keys: Attribute(s) to use for querying the documents to delete.
"""
if isinstance(model, str):
self.model = ModelNameConverter.get_model_class(model)
else:
self.model = model
assert primary_keys is not None, 'primary_keys must be defined'
if not isinstance(primary_keys, (list, tuple)):
primary_keys = [primary_keys]
self.primary_keys = primary_keys
self.counter = Counter(delete=0, error=0)
def map(self, state):
for value_map in state:
value_map = DictObject(value_map)
q = {
pk.replace('.', '__'): attrgetter(pk)(value_map)
for pk in self.primary_keys
}
try:
count = self.model.objects(**q).delete()
except OperationError:
self.counter['error'] += 1
else:
self.counter['delete'] += count
yield value_map
def process_step(self, state, context):
context['delete'] = {
'model': self.model,
}
state = self.map(state)
return state, context
def cleanup(self, context, exception=None):
context['counter'] = self.counter
return context