from copy import deepcopy
import flask
from flask_babel import lazy_gettext as _, ngettext as _n
from mongoengine import DoesNotExist
from xmm.core import mongo
from xmm.models import PermissionMixin
from xmm.util.flask_helpers import api_method
from xmm.util.permissions import require_permission
from . import DynamicFieldsMixin, fields, SignalizedMetaclass
class VersionsMetaclass(SignalizedMetaclass):
"""
Versioned documents metaclass.
Versioned documents need their history model loaded
into the registry to correctly work with other
history models in MongoEngine.
"""
def __init__(self, name, bases, dct):
super().__init__(name, bases, dct)
self._get_history_model()
class BaseHistoryModel:
"""An abstract base history model."""
meta = {
'allow_inheritance': True,
}
[docs]class VersionsMixin:
"""
Model mixin that allows archiving different versions of a model instance.
TODO: Magic methods that will query the history database
if the real document has been deleted for whatever reason.
"""
STATUS_DRAFT = 'draft'
STATUS_PUBLISHED = 'published'
STATUS_DELETED = 'deleted'
STATUS_CHOICES = [
(STATUS_DRAFT, _('Entwurf')),
(STATUS_PUBLISHED, _('Veröffentlicht')),
(STATUS_DELETED, _('Gelöscht')),
]
_history = fields.DictField()
_history_name = 'history'
_history_cls = {}
_is_history_record = False
dict_fields = [('version', 'get_version'), ('status', 'get_status')]
ACTION_PUBLISH = 'publish'
need_actions = PermissionMixin.need_actions + (
ACTION_PUBLISH,
)
need_action_labels = PermissionMixin.need_action_labels.copy()
need_action_labels.update({
ACTION_PUBLISH: _('Veröffentlichen'),
})
[docs] def get_real_id(self):
"""Get the original document's ObjectId."""
if self._is_history_record:
return self._history['id']
return self.id
[docs] def get_version(self):
"""
Get the version of this object.
If the version is 0, the object has never been published.
:return int: incremental version number
"""
return self._history.get('version', 1)
[docs] def get_status(self):
"""
Get the status for a page.
Can be either one of ``STATUS_CHOICES``.
"""
if self._history.get('deleted', False):
return self.STATUS_DELETED
if self._history.get('is_dirty', True):
return self.STATUS_DRAFT
return self.STATUS_PUBLISHED
def is_draft(self):
return self.get_status() == self.STATUS_DRAFT
def is_published(self):
return self.get_status() == self.STATUS_PUBLISHED
def is_deleted(self):
return self.get_status() == self.STATUS_DELETED
[docs] def get_status_name(self):
"""Get a human readable status name."""
return dict(self.STATUS_CHOICES).get(self.get_status())
[docs] @classmethod
def get_newest(cls, obj):
"""
Get the newest version of that object.
Might return the same object.
"""
if not cls._is_history_record:
raise RuntimeError('get_newest can only be called on the history.')
return cls.objects(_history__id=obj._history['id']).order_by('-_history__version').first()
@classmethod
def get_history_collection_name(cls):
return '{}.{}'.format(
cls._get_collection_name(),
cls._history_name
)
@classmethod
def _get_history_model(cls):
"""Get the class shadowing the real one in a history collection."""
if cls._is_history_record:
return cls
if cls._history_cls.get(cls) is None:
# we can't have unique indexes on history instances
def _build_index_specs(cls, meta_indexes):
index_specs = super()._build_index_specs(meta_indexes)
for spec in index_specs:
spec['unique'] = False
return index_specs
cls_dict = {
'_elastic_doc_type': '{}.history'.format(cls._elastic_doc_type),
'_is_history_record': True,
'_base_class': cls,
'meta': {
'collection': cls.get_history_collection_name(),
},
'_build_index_specs': classmethod(_build_index_specs),
}
if issubclass(cls, DynamicFieldsMixin) and cls.has_type_class():
type_field = deepcopy(getattr(cls, cls.get_type_field_name()))
type_field.reverse_delete_rule = mongo.CASCADE
cls_dict[cls.get_type_field_name()] = type_field
bases = (cls,)
if hasattr(cls, '_abstract'):
for base in cls.__bases__:
if base in cls._history_cls:
bases = (cls._abstract, cls._history_cls[base])
break
cls._history_cls[cls] = type('{}History'.format(cls.__name__), bases, cls_dict)
return cls._history_cls[cls]
[docs] @classmethod
def get_by_id(cls, obj_id):
"""Get an object by its original ObjectId."""
if not cls._is_history_record:
return cls.objects.get(id=obj_id)
return cls.objects(_history__id=obj_id).order_by('-_history__version').first()
[docs] @classmethod
def get_by_version(cls, obj_id, version):
"""
Get a specific version of the specified object.
:param obj_id: ObjectId to look for.
:param version: Can be a number, ``'latest'`` or ``'draft'``.
"""
if version == 'draft':
if cls._is_history_record:
return cls._base_class.objects.get(id=obj_id)
return cls.objects.get(id=obj_id)
model = cls._get_history_model()
if version == 'latest':
return model.objects(_history__id=obj_id).order_by('-_history__version').first()
try:
version = int(version)
except (ValueError, TypeError):
return None
try:
return model.objects.get(_history__id=obj_id, _history__version=int(version))
except model.DoesNotExist:
return None
[docs] @classmethod
def get_by_version_or_404(cls, obj_id, version):
"""
Get a specific version of the specified object.
Trigger a 404 when we don't find the object.
:param obj_id: ObjectId to look for.
:param version: Can be a number, ``'latest'`` or ``'draft'``.
"""
obj = cls.get_by_version(obj_id, version)
if obj is None:
flask.abort(404)
return obj
@classmethod
@api_method
@require_permission(action=ACTION_PUBLISH, param='model')
def api_publishmany(cls, object_ids):
objs = cls.objects(id__in=object_ids)
for obj in objs:
obj._publish()
flask.flash(
_n('%(num)d Seite wurden erfolgreich veröffentlicht.',
'%(num)d Seiten wurden erfolgreich veröffentlicht.',
num=len(objs)),
'success'
)
def _history_data(self):
return {
'id': self.id,
'deleted': False,
'is_dirty': False,
'is_latest': True,
'version': self.get_version(),
}
def _publish(self):
"""
Create a copy of this document in the history collection.
Increases the draft version by 1.
"""
# Set `is_latest` to False on all previous versions
for history_obj in self._get_history_model().objects(_history__id=self.id):
history_obj._history['is_latest'] = False
history_obj.save()
clone = self._get_history_model()()
for field in self._fields.keys():
if field in ('id', '_cls', '_history'):
continue
try:
setattr(clone, field, getattr(self, field))
except DoesNotExist:
pass
clone._history = self._history_data()
clone.save()
self._history['version'] = self.get_version() + 1
self._history['is_dirty'] = False
# set this attribute to prevent setting the dirty flag again
setattr(self, 'is_clean', True)
self.save()
delattr(self, 'is_clean')
return clone
[docs] @api_method
@require_permission(action=ACTION_PUBLISH, param='model')
def api_publish(self):
"""
Save a new copy of this object in the history collection.
Requires no parameters.
Returns the ``version`` of the newly published document,
example response::
{
"status": true,
"version": 3
}
"""
clone = self._publish()
message = _('Version %(version)d von "%(page)s" wurde veröffentlicht.',
version=clone.get_version(), page=str(clone))
flask.flash(message, 'success')
return {'version': clone.get_version()}
[docs] @api_method
@require_permission(action=ACTION_PUBLISH, param='model')
def api_publish_tree(self):
"""
Publish all draft pages below and including the given page.
Requires no parameters.
Returns the number of published pages in ``published_count``,
example response::
{
"published_count": 3
}
"""
published_count = self._publish_tree()
message = _('Es wurden %(published_count)d Seiten veröffentlicht.',
published_count=published_count)
flask.flash(message, 'success')
return {'published_count': published_count}
[docs] @classmethod
def pre_save(cls, sender, document, **kwargs):
"""Set this object's dirty status so we know there have been changes since last publish."""
if not document._is_history_record and not hasattr(document, 'is_clean'):
document._history['is_dirty'] = True
return super().pre_save(sender, document, **kwargs)
[docs] @api_method
def api_history(self):
"""
Get this object's history sorted by newest first.
Returns a list of objects in the ``history`` key,
example response::
{
"status": true,
"history": [
{
"name": "My newest version",
"_history": {
"id": "someobjectid",
"version": 2
}
},
{
"name": "My old version",
"_history": {
"id": "someobjectid",
"version": 1
}
}
]
}
"""
model = self._get_history_model()
old_versions = model.objects(_history__id=self.id).order_by('-_history__version')
return {'history': [
obj.to_dict() for obj in old_versions
]}
[docs] @api_method
def api_version(self, version):
"""
Get a specific version of this object.
Request with ``version=XXX`` parameter, to get just one specific version.
You can also specify ``latest`` to get the newest published version.
Returns the requested object in the ``object`` key,
example response::
{
"status": true,
"object": {
"name": "My object",
"_history": {
"id": "someobjectid",
"version": 3
}
}
}
"""
return {'object': self.get_by_version_or_404(self.id, version).to_dict()}
def _delete_history(self):
model = self._get_history_model()
model.objects(_history__id=self.id).delete()
def _mark_deleted(self):
"""Mark all versions of this object as deleted."""
model = self._get_history_model()
model.objects(_history__id=self.id).update(_history__deleted=True)
has_changes = self._history['is_dirty']
clone = self._publish()
clone._history.update(is_dirty=has_changes, deleted=True)
clone.save()
[docs] def delete(self, with_history=False, **kwargs):
"""
Delete this object and optionally with its entire history.
Unless ``with_history`` is set, a new 'deleted' record will
be created in the history.
:param bool with_history: Also delete all corresponding history entries.
"""
if not self._is_history_record:
if with_history:
self._delete_history()
else:
self._mark_deleted()
super().delete(**kwargs)