Source code for xmm.pipeline.steps.load.rawdelete

from collections import Counter
from operator import attrgetter

import bson

from xmm.core import mongo
from xmm.util.dictobject import DictObject
from ..base import PipelineStep


[docs]class RawDbDeleteStep(PipelineStep): """ Deletes raw records from a MongoDB collection. It's a **LOT** faster than using the default delete method. After using this step, rebuilding the ElasticSearch index might be necessary. .. warning:: Deletes all matching documents! """
[docs] def __init__(self, collection_name, primary_keys=None): """ Create a new raw DB deleter step. :param str collection_name: Collection to delete documents from. """ self.collection_name = collection_name self.collection = mongo.connection.get_default_database()[self.collection_name] self.primary_keys = primary_keys or [] self.counter = Counter(delete=0, error=0)
def map(self, state): getters = [ (key, attrgetter(key)) for key in self.primary_keys ] for document in state: doc = DictObject(document) try: result = self.collection.delete_many({ key: getter(doc) for key, getter in getters }) self.counter['delete'] += result.deleted_count except bson.errors.InvalidDocument: self.counter['error'] += 1 else: yield document def process_step(self, state, context): context['delete'] = { 'collection': self.collection_name, } state = self.map(state) return state, context def cleanup(self, context, exception=None): context['counter'] = self.counter return context