import os
import time
from celery.exceptions import Retry
from celery.task import Task as CeleryTask
from flask_babel import lazy_gettext as _
from xmm.models import PipelineConfig
from xmm.pipeline import Pipeline
from .base import runner
[docs]class PipelineTask(CeleryTask):
"""Runs a highly configurable pipeline."""
attribute_params = [
{
'key': 'configuration_id',
'label': _('Konfiguration'),
'datatype': 'reference',
'metadata': {
'model': 'PipelineConfig',
'required': True,
},
},
]
@runner
def run(self, task, config=None, configuration_id=None, path=None):
"""
Run this pipeline.
:param xmm.models.Task task: A task
:param list|OrderedDict config: A pipeline configuration
:param ObjectId configuration_id: Alternatively a PipelineConfig ObjectID.
:param str path: Path to a file that should be set in the ReadStep
"""
try:
if config is not None:
pipeline_config = PipelineConfig(config=config)
else:
pipeline_config = PipelineConfig.objects.get(id=configuration_id)
config = pipeline_config.get_config()
if path:
# make sure the file is written
if time.time() - os.path.getmtime(path) < 5:
self.retry(countdown=5)
# set the given file path
config['ReadStep']['options']['filename'] = path
pipeline = Pipeline(config)
state, context = pipeline.run()
return state
except Retry:
pass