Source code for xmm.tasks.pipeline

import os
import time

from celery.exceptions import Retry
from celery.task import Task as CeleryTask
from flask_babel import lazy_gettext as _

from xmm.models import PipelineConfig
from xmm.pipeline import Pipeline
from .base import runner


[docs]class PipelineTask(CeleryTask): """Runs a highly configurable pipeline.""" attribute_params = [ { 'key': 'configuration_id', 'label': _('Konfiguration'), 'datatype': 'reference', 'metadata': { 'model': 'PipelineConfig', 'required': True, }, }, ] @runner def run(self, task, config=None, configuration_id=None, path=None): """ Run this pipeline. :param xmm.models.Task task: A task :param list|OrderedDict config: A pipeline configuration :param ObjectId configuration_id: Alternatively a PipelineConfig ObjectID. :param str path: Path to a file that should be set in the ReadStep """ try: if config is not None: pipeline_config = PipelineConfig(config=config) else: pipeline_config = PipelineConfig.objects.get(id=configuration_id) config = pipeline_config.get_config() if path: # make sure the file is written if time.time() - os.path.getmtime(path) < 5: self.retry(countdown=5) # set the given file path config['ReadStep']['options']['filename'] = path pipeline = Pipeline(config) state, context = pipeline.run() return state except Retry: pass