Source code for xmm.models.versions

from copy import deepcopy

import flask
from flask_babel import lazy_gettext as _, ngettext as _n
from mongoengine import DoesNotExist

from xmm.core import mongo
from xmm.models import PermissionMixin
from xmm.util.flask_helpers import api_method
from xmm.util.permissions import require_permission
from . import DynamicFieldsMixin, fields, SignalizedMetaclass


class VersionsMetaclass(SignalizedMetaclass):
    """
    Versioned documents metaclass.

    Versioned documents need their history model loaded
    into the registry to correctly work with other
    history models in MongoEngine.
    """

    def __init__(self, name, bases, dct):
        super().__init__(name, bases, dct)
        self._get_history_model()


class BaseHistoryModel:
    """An abstract base history model."""

    meta = {
        'allow_inheritance': True,
    }


[docs]class VersionsMixin: """ Model mixin that allows archiving different versions of a model instance. TODO: Magic methods that will query the history database if the real document has been deleted for whatever reason. """ STATUS_DRAFT = 'draft' STATUS_PUBLISHED = 'published' STATUS_DELETED = 'deleted' STATUS_CHOICES = [ (STATUS_DRAFT, _('Entwurf')), (STATUS_PUBLISHED, _('Veröffentlicht')), (STATUS_DELETED, _('Gelöscht')), ] _history = fields.DictField() _history_name = 'history' _history_cls = {} _is_history_record = False dict_fields = [('version', 'get_version'), ('status', 'get_status')] ACTION_PUBLISH = 'publish' need_actions = PermissionMixin.need_actions + ( ACTION_PUBLISH, ) need_action_labels = PermissionMixin.need_action_labels.copy() need_action_labels.update({ ACTION_PUBLISH: _('Veröffentlichen'), })
[docs] def get_real_id(self): """Get the original document's ObjectId.""" if self._is_history_record: return self._history['id'] return self.id
[docs] def get_version(self): """ Get the version of this object. If the version is 0, the object has never been published. :return int: incremental version number """ return self._history.get('version', 1)
[docs] def get_status(self): """ Get the status for a page. Can be either one of ``STATUS_CHOICES``. """ if self._history.get('deleted', False): return self.STATUS_DELETED if self._history.get('is_dirty', True): return self.STATUS_DRAFT return self.STATUS_PUBLISHED
def is_draft(self): return self.get_status() == self.STATUS_DRAFT def is_published(self): return self.get_status() == self.STATUS_PUBLISHED def is_deleted(self): return self.get_status() == self.STATUS_DELETED
[docs] def get_status_name(self): """Get a human readable status name.""" return dict(self.STATUS_CHOICES).get(self.get_status())
[docs] @classmethod def get_newest(cls, obj): """ Get the newest version of that object. Might return the same object. """ if not cls._is_history_record: raise RuntimeError('get_newest can only be called on the history.') return cls.objects(_history__id=obj._history['id']).order_by('-_history__version').first()
@classmethod def get_history_collection_name(cls): return '{}.{}'.format( cls._get_collection_name(), cls._history_name ) @classmethod def _get_history_model(cls): """Get the class shadowing the real one in a history collection.""" if cls._is_history_record: return cls if cls._history_cls.get(cls) is None: # we can't have unique indexes on history instances def _build_index_specs(cls, meta_indexes): index_specs = super()._build_index_specs(meta_indexes) for spec in index_specs: spec['unique'] = False return index_specs cls_dict = { '_elastic_doc_type': '{}.history'.format(cls._elastic_doc_type), '_is_history_record': True, '_base_class': cls, 'meta': { 'collection': cls.get_history_collection_name(), }, '_build_index_specs': classmethod(_build_index_specs), } if issubclass(cls, DynamicFieldsMixin) and cls.has_type_class(): type_field = deepcopy(getattr(cls, cls.get_type_field_name())) type_field.reverse_delete_rule = mongo.CASCADE cls_dict[cls.get_type_field_name()] = type_field bases = (cls,) if hasattr(cls, '_abstract'): for base in cls.__bases__: if base in cls._history_cls: bases = (cls._abstract, cls._history_cls[base]) break cls._history_cls[cls] = type('{}History'.format(cls.__name__), bases, cls_dict) return cls._history_cls[cls]
[docs] @classmethod def get_by_id(cls, obj_id): """Get an object by its original ObjectId.""" if not cls._is_history_record: return cls.objects.get(id=obj_id) return cls.objects(_history__id=obj_id).order_by('-_history__version').first()
[docs] @classmethod def get_by_version(cls, obj_id, version): """ Get a specific version of the specified object. :param obj_id: ObjectId to look for. :param version: Can be a number, ``'latest'`` or ``'draft'``. """ if version == 'draft': if cls._is_history_record: return cls._base_class.objects.get(id=obj_id) return cls.objects.get(id=obj_id) model = cls._get_history_model() if version == 'latest': return model.objects(_history__id=obj_id).order_by('-_history__version').first() try: version = int(version) except (ValueError, TypeError): return None try: return model.objects.get(_history__id=obj_id, _history__version=int(version)) except model.DoesNotExist: return None
[docs] @classmethod def get_by_version_or_404(cls, obj_id, version): """ Get a specific version of the specified object. Trigger a 404 when we don't find the object. :param obj_id: ObjectId to look for. :param version: Can be a number, ``'latest'`` or ``'draft'``. """ obj = cls.get_by_version(obj_id, version) if obj is None: flask.abort(404) return obj
@classmethod @api_method @require_permission(action=ACTION_PUBLISH, param='model') def api_publishmany(cls, object_ids): objs = cls.objects(id__in=object_ids) for obj in objs: obj._publish() flask.flash( _n('%(num)d Seite wurden erfolgreich veröffentlicht.', '%(num)d Seiten wurden erfolgreich veröffentlicht.', num=len(objs)), 'success' ) def _history_data(self): return { 'id': self.id, 'deleted': False, 'is_dirty': False, 'is_latest': True, 'version': self.get_version(), } def _publish(self): """ Create a copy of this document in the history collection. Increases the draft version by 1. """ # Set `is_latest` to False on all previous versions for history_obj in self._get_history_model().objects(_history__id=self.id): history_obj._history['is_latest'] = False history_obj.save() clone = self._get_history_model()() for field in self._fields.keys(): if field in ('id', '_cls', '_history'): continue try: setattr(clone, field, getattr(self, field)) except DoesNotExist: pass clone._history = self._history_data() clone.save() self._history['version'] = self.get_version() + 1 self._history['is_dirty'] = False # set this attribute to prevent setting the dirty flag again setattr(self, 'is_clean', True) self.save() delattr(self, 'is_clean') return clone
[docs] @api_method @require_permission(action=ACTION_PUBLISH, param='model') def api_publish(self): """ Save a new copy of this object in the history collection. Requires no parameters. Returns the ``version`` of the newly published document, example response:: { "status": true, "version": 3 } """ clone = self._publish() message = _('Version %(version)d von "%(page)s" wurde veröffentlicht.', version=clone.get_version(), page=str(clone)) flask.flash(message, 'success') return {'version': clone.get_version()}
[docs] @api_method @require_permission(action=ACTION_PUBLISH, param='model') def api_publish_tree(self): """ Publish all draft pages below and including the given page. Requires no parameters. Returns the number of published pages in ``published_count``, example response:: { "published_count": 3 } """ published_count = self._publish_tree() message = _('Es wurden %(published_count)d Seiten veröffentlicht.', published_count=published_count) flask.flash(message, 'success') return {'published_count': published_count}
[docs] @classmethod def pre_save(cls, sender, document, **kwargs): """Set this object's dirty status so we know there have been changes since last publish.""" if not document._is_history_record and not hasattr(document, 'is_clean'): document._history['is_dirty'] = True return super().pre_save(sender, document, **kwargs)
[docs] @api_method def api_history(self): """ Get this object's history sorted by newest first. Returns a list of objects in the ``history`` key, example response:: { "status": true, "history": [ { "name": "My newest version", "_history": { "id": "someobjectid", "version": 2 } }, { "name": "My old version", "_history": { "id": "someobjectid", "version": 1 } } ] } """ model = self._get_history_model() old_versions = model.objects(_history__id=self.id).order_by('-_history__version') return {'history': [ obj.to_dict() for obj in old_versions ]}
[docs] @api_method def api_version(self, version): """ Get a specific version of this object. Request with ``version=XXX`` parameter, to get just one specific version. You can also specify ``latest`` to get the newest published version. Returns the requested object in the ``object`` key, example response:: { "status": true, "object": { "name": "My object", "_history": { "id": "someobjectid", "version": 3 } } } """ return {'object': self.get_by_version_or_404(self.id, version).to_dict()}
def _delete_history(self): model = self._get_history_model() model.objects(_history__id=self.id).delete() def _mark_deleted(self): """Mark all versions of this object as deleted.""" model = self._get_history_model() model.objects(_history__id=self.id).update(_history__deleted=True) has_changes = self._history['is_dirty'] clone = self._publish() clone._history.update(is_dirty=has_changes, deleted=True) clone.save()
[docs] def delete(self, with_history=False, **kwargs): """ Delete this object and optionally with its entire history. Unless ``with_history`` is set, a new 'deleted' record will be created in the history. :param bool with_history: Also delete all corresponding history entries. """ if not self._is_history_record: if with_history: self._delete_history() else: self._mark_deleted() super().delete(**kwargs)