Source code for xmm.models.file

import grp
import mimetypes
import os
import pwd
import shutil
import stat
import subprocess
import sys
from collections import namedtuple, OrderedDict
from datetime import datetime
from functools import lru_cache

import exifread
import flask
from flask_babel import lazy_gettext as _
from mongoengine import DoesNotExist, NotUniqueError
from PIL import Image

from xmm.core.permissions import media_need, media_permission
from xmm.models import MetaFieldsMixin
from xmm.models.task import HotfolderTaskTrigger
from xmm.modules.references import find_cross_references
from xmm.util.flask_helpers import api_method

from . import BaseDocument, fields, SignalizedMetaclass, Task, TreeModelMixin


ImageSize = namedtuple('ImageSize', ['width', 'height'])


[docs]class File(MetaFieldsMixin, TreeModelMixin, BaseDocument, metaclass=SignalizedMetaclass): _elastic_index = 'media' _elastic_doc_type = 'file' #: Path to file. path = fields.StringField(required=True) #: Is file deleted from storage? missing = fields.BooleanField(default=False) #: Actual filename including file extension. filename = fields.StringField(label=_('Dateiname'), required=True, flexGrow=2, width=400, isSortable=True) #: Size of file in bytes. size = fields.IntField(label=_('Grösse'), datatype='filesize', required=False, width=120, isSortable=True) #: Mimetype of file. mimetype = fields.StringField(required=False) #: Is it a directory? isdir = fields.BooleanField(required=True, default=False) #: Title of the file. title = fields.MultilingualField(fields.StringField()) #: Text that makes sense to the user. description = fields.MultilingualField(fields.TextField()) #: Alternative text shown if frontend can't load image. alt_text = fields.MultilingualField(fields.StringField()) #: Is it hidden? Defaults to True for file names starting with a period "." hidden = fields.BooleanField(required=True, default=False) #: Created time. ctime = fields.DatetimeField(default=datetime.now, required=True) #: Modified time. mtime = fields.DatetimeField(default=datetime.now, required=True) meta = { 'verbose_name': _('Datei'), 'verbose_name_plural': _('Dateien'), 'indexes': [{'fields': ('path', 'filename'), 'unique': True}, {'fields': ('path', 'isdir')}, 'filename', 'hidden', 'tags'], 'collection': 'media_file' } can_be_referenced = True dict_fields = [ 'changed_dt', 'filename', 'size', 'title', 'description', 'alt_text', 'path', 'isdir', 'relpath', 'owner', 'is_image', 'permissions', ] columns_definitions = [ ('thumbnail', fields.MediaField(label=_(' '), cellprops={'draggable': True}, columnKeyIn=[], width=43)), 'filename', 'tags', 'changed_dt', 'size', ] virtual_fields = { 'relpath': fields.StringField(), 'abspath': fields.StringField(), } @classmethod def get_module_need(cls): return media_need @classmethod def get_module_permission(cls): return media_permission def __str__(self): return self.filename def to_api_json(self, **options): data = super().to_api_json(**options) data.update({ 'id': self.id, 'filename': self.filename, 'title': self.title, 'description': self.description, 'alt_text': self.alt_text, 'path': self.relpath, 'mtime': self.mtime, 'ctime': self.ctime, 'atime': self.atime, }) return data @classmethod def transform_search_query(cls, query): query = super().transform_search_query(query) # never show hidden files query['query'].setdefault('bool', {}) query['query']['bool'].setdefault('must', []) query['query']['bool']['must'] = query['query']['bool'].get('must', []) + [{ 'match': { 'hidden': False } }] if '__isdir__' in query: # query only dirs/files if query['__isdir__'] is not None: query['query']['bool']['must'].append({ 'term': { 'isdir': bool(query['__isdir__']) } }) del query['__isdir__'] if '__path__' in query: # add path prefix query try: folder = File.objects.get(id=query['__path__'], isdir=True) except DoesNotExist: folder = None query['query']['bool']['must'].append({ 'prefix': { 'path.raw': folder.relpath.lower() if folder else '' } }) del query['__path__'] if '__type__' in query: # add MIME filetype query query['query']['bool']['must'].append({ 'match': { 'mimetype.raw': query.pop('__type__'), }, }) if '__parent__' in query: # add parent match query try: parent = File.objects.get(id=query['__parent__'], isdir=True) except DoesNotExist: parent = None query['query']['bool']['must'].append({ 'match': { 'path.raw': parent.relpath if parent else '' } }) del query['__parent__'] return query @classmethod
[docs] def build_elastic_query(cls, term, multilingual=True): """ Build a search query for a term. :param str term: The search term. :param bool multilingual: Set to False to search only the current language. """ def match_id(term): return { 'term': { '_id': term, }, } def match_phrase(term, fields): return { 'multi_match': { 'query': term, 'type': 'cross_fields', 'fields': fields, 'operator': 'and', }, } min_match_query = { 'bool': { 'minimum_should_match': 1, 'should': [], } } queries = { 'must': [], 'must_not': [], 'should': [], } term = term.strip('/') if '/' in term: folder, term = term.rsplit('/', 1) queries['must'].append(match_phrase(folder, ['path.partial_front^2', 'path.full^4'])) min_match_query['bool']['should'].append(match_id(term)) es_fields = [ 'path.partial_front^2', 'path.full^4', 'filename.partial_front^2.5', 'filename.full^6', 'filename.partial_back', 'tags.partial_front^0.5', 'tags.full', 'title.{}.full^0.5'.format(flask.g.lang_code), 'title.{}.partial_front^0.1'.format(flask.g.lang_code), 'description.{}.full^0.5'.format(flask.g.lang_code), 'description.{}.partial_front^0.1'.format(flask.g.lang_code), 'alt_text.{}.full^0.5'.format(flask.g.lang_code), 'alt_text.{}.partial_front^0.1'.format(flask.g.lang_code), ] min_match_query['bool']['should'].append(match_phrase(term, es_fields)) queries['must'].append(min_match_query) return queries
@classmethod def pre_save(cls, sender, document, **kwargs): document.hidden = document.filename.startswith('.') if document.path == '.': document.path = '' return BaseDocument.pre_save(sender, document, **kwargs) @classmethod def post_save(cls, sender, document, created=False, **kwargs): from xmm.models.cms.base import clear_cms_cache clear_cms_cache() if created and not document.isdir: for task in Task.objects: for trigger in task.filter_triggers(HotfolderTaskTrigger.TRIGGER_TYPE): if trigger.match(document.relpath): task.execute(trigger, path=document.abspath) return super().post_save(sender, document, **kwargs) @classmethod def verify_deletions(cls, objects): for file in objects: refs = find_cross_references(cls, file.id) if len(refs): msg = _('Datei "%(f)s" kann nicht gelöscht werden, da sie noch referenziert wird.', f=file.filename) flask.flash(msg, 'warning') return False return True @classmethod def post_delete(cls, sender, document, **kwargs): if not kwargs.get('already_removed'): try: document._log_deletion() if document.isdir: shutil.rmtree(document.abspath) else: os.remove(document.abspath) except: flask.current_app.logger.exception( 'Delete file with ID {}. Deletion failed! Path is: {!r}'.format(document.id, document.abspath) ) return super().post_delete(sender, document, **kwargs) def _log_deletion(self): """ Log deletion of a file with its ID and path. Does this recursively for all children of directories. """ obj_type = 'directory' if self.isdir else 'file' for child in self.get_children(): child._log_deletion() flask.current_app.logger.info( 'Delete {} with ID {}. Path was: {!r}'.format(obj_type, self.id, self.abspath) ) @classmethod def create_or_update(cls, path, filename): try: f = File.objects.get(path=path, filename=filename) f.refresh_fs(force_update=False) except DoesNotExist: f = File() f.path = path f.filename = filename f.refresh_fs() return f @classmethod def create_from_json(cls, json): parent = None if 'parentId' in json: parent = File.objects.get(id=json['parentId']) if json['parentId'] else None del json['parentId'] obj = File() obj.path = parent.relpath if parent else '' obj.isdir = True try: obj.create(json['filename']) obj.refresh_fs() except NotUniqueError: flask.flash( _('Der Ordnername \'%(filename)s\' wurde bereits vergeben.', filename=json['filename']), 'error') return None except: flask.current_app.logger.exception('') flask.flash( _('Der Ordner \'%(filename)s\' konnte nicht gespeichert werden.', filename=json['filename']), 'error') return None del json['filename'] return obj.update_from_json(json) @classmethod
[docs] def find_referencing_models(cls): """Find models that should be searched for references of this class.""" from .attribute import Attribute for attr in Attribute.objects(field_class='xmm.models.fields.file.FileField'): yield from [(model, 'fields.' + attr.key) for model in attr.find_dynamic_usages()]
def update_from_json(self, json, compare_timestamps=True): if 'filename' in json: if json['filename'] != self.filename: try: self.rename(json['filename']) self.save() self.refresh_fs() except NotUniqueError: if self.isdir: msg = 'Der Ordnername \'%(filename)s\' wurde bereits vergeben.' else: msg = 'Der Dateiname \'%(filename)s\' wurde bereits vergeben.' flask.flash(_(msg, filename=json['filename']), 'error') return None except Exception as e: if self.isdir: msg = 'Der Ordner \'%(filename)s\' konnte nicht gespeichert werden.' else: msg = 'Die Datei \'%(filename)s\' konnte nicht gespeichert werden.' flask.flash(_(msg, filename=json['filename']), 'error') return None del json['filename'] # Timestamp on files must be ignored for optimistic locking. return super().update_from_json(json, compare_timestamps=False) @classmethod def _get_file_by_realpath(cls, attr, searchpath): """Find a file by its full absolute or relative path.""" if attr == 'abspath': searchpath = os.path.relpath(searchpath, flask.current_app.config['MEDIA_ROOT']) path, filename = os.path.split(searchpath) try: return cls.objects.get(path=path, filename=filename) except DoesNotExist: flask.current_app.logger.warning(f'File does not exist: {searchpath}') return None @classmethod def _get_file_list_by_realpath(cls, attr, searchpaths): """Find a number of files by their absolute or relative paths.""" for searchpath in searchpaths: file = cls._get_file_by_realpath(attr, searchpath) if file: yield file @classmethod
[docs] def from_field_map(cls, field_map, dataset, is_list=False): """Find an instance by looking up fields in a referencing dataset.""" err_query = '`attribute_map` for files must be either {relpath}, {abspath} or {path, filename}.' err_list = 'Cannot use path/filename attribute_map for multivalue attributes! Use relpath or abspath instead!' # We cannot use a combination of these mappings. query_keys = set(field_map.keys()) assert query_keys in [{'relpath'}, {'abspath'}, {'path', 'filename'}], err_query assert not is_list or query_keys != {'path', 'filename'}, err_list if query_keys in [{'relpath'}, {'abspath'}]: map_attr = list(query_keys)[0] getter = field_map[map_attr] value = getter(dataset) if not value: return None if is_list: if not isinstance(value, (list, tuple)): value = [value] return list(cls._get_file_list_by_realpath(map_attr, value)) return cls._get_file_by_realpath(map_attr, value) try: q = {} for key, getter in field_map.items(): value = getter(dataset) q[key] = value return cls.objects.get(**q) except DoesNotExist: flask.current_app.logger.warning(f'File does not exist: {q!r}') return None
def refresh_fs(self, force_update=True): abspath = self.abspath if os.path.exists(abspath): self.ctime = datetime.fromtimestamp(os.path.getctime(abspath)).replace(microsecond=0) self.mtime = datetime.fromtimestamp(os.path.getmtime(abspath)).replace(microsecond=0) if os.path.isdir(abspath): self.isdir = True self.size = None self.mimetype = None if force_update or self._changed_fields: self.save() elif os.path.isfile(abspath): self.isdir = False self.size = os.path.getsize(abspath) self.mimetype, encoding = mimetypes.guess_type(abspath) if self.mimetype is None: self.mimetype = 'application/octet-stream' if force_update or self._changed_fields: self.save() elif not os.path.exists(abspath): if self.id is not None: self.delete() def rename(self, newpath, newname=None, done=False): if newname is None: newname = newpath newpath = self.path old_path = self.abspath old_relpath = self.relpath self.path = newpath self.filename = newname new_path = self.abspath new_relpath = self.relpath self.save() if self.isdir: for f in File.objects(path__startswith=old_relpath): f.path = new_relpath + f.path[len(old_relpath):] f.save() if not done: shutil.move(old_path, new_path) def create(self, newname, done=False): self.filename = newname new_path = self.abspath f = None if not done: if self.isdir: if os.path.exists(new_path): raise NotUniqueError('Dir already exists!') self.save() f = File.objects.get(path=self.path, filename=self.filename) try: os.makedirs(new_path) except: self.delete() raise else: if os.path.exists(new_path): raise NotUniqueError('File already exists!') self.save() try: fd = os.open(new_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL) f = os.fdopen(fd, 'wb') except: self.delete() raise return f def view_size(self): if self.isdir: return '' factor = 1024 * 1024 * 1024 if self.size >= factor: return '{:.1f} GB'.format(float(self.size) / factor) factor = 1024 * 1024 if self.size >= factor: return '{:.1f} MB'.format(float(self.size) / factor) factor = 1024 if self.size >= factor: return '{:.1f} kB'.format(float(self.size) / factor) return '{} B'.format(self.size) def get_path(self, absolute=False, directory=None): if directory is not None: return os.path.join(flask.current_app.config['MEDIA_ROOT'], directory) if absolute: return os.path.join( flask.current_app.config['MEDIA_ROOT'], self.path, self.filename) return os.path.join(self.path, self.filename) @property def relpath(self): return self.get_path() @property def abspath(self): return self.get_path(True) @property def extension_short(self): if self.isdir: return '' if '.' in self.filename: return self.filename.split('.').pop().lower() return '' @property def extension(self): if self.isdir: return '' return self.get_filename_parts()[1].lower() @property def basename(self): """The filename without extension.""" if self.isdir: return self.filename return self.get_filename_parts()[0] def get_filename_parts(self): return os.path.splitext(self.filename) def _set_missing(self): if not self.missing: self.missing = True self.save() @property def owner(self): path = self.abspath try: user = pwd.getpwuid(os.stat(path).st_uid).pw_name except KeyError: user = os.stat(path).st_uid except FileNotFoundError: self._set_missing() user = None try: group = grp.getgrgid(os.stat(path).st_gid).gr_name except KeyError: group = os.stat(path).st_gid except FileNotFoundError: self._set_missing() return None return '{}.{}'.format(user, group) @property def permissions(self): try: return stat.filemode(os.stat(self.abspath).st_mode) except FileNotFoundError: self._set_missing() return None @property def atime(self): try: return datetime.fromtimestamp(os.path.getatime(self.abspath)).replace(microsecond=0) except FileNotFoundError: self._set_missing() return None
[docs] def get_thumbnail_path(self, size): """Get absolute path to the thumbnail.""" dir = os.path.join( flask.current_app.config['STATIC_ROOT'], 'thumbnails', '{}x{}'.format(*size)) os.makedirs(dir, 0o775, exist_ok=True) thumb_filename = '{}{}'.format(str(self.id), self.extension) return os.path.join(dir, thumb_filename)
[docs] def clear_thumbnails(self): """Remove all cached thumbnails.""" import glob thumb_filename = '{}*{}'.format(str(self.id), self.extension) files = glob.glob(os.path.join( flask.current_app.config['STATIC_ROOT'], 'thumbnails', '**', thumb_filename, )) for path in files: os.remove(path)
def get_subdirectories(self): if not self.isdir: # Directories don't have subdirectories, eh. return [] return File.objects(path=self.relpath, isdir=True).order_by('filename') def get_children(self): if not self.isdir: # Directories don't have children, eh. return [] return File.objects(path=self.relpath).order_by('filename') def has_subdirectories(self): return bool(File.objects(path=self.relpath, isdir=True).count()) def get_parent(self): if self.path: path, filename = os.path.split(self.path) try: return File.objects.get(path=path, filename=filename) except File.DoesNotExist: pass return None def get_tree_data(self, **kwargs): if 'parent' not in kwargs: kwargs['parent'] = self.get_parent().id data = super().get_tree_data(**kwargs) data['relpath'] = self.relpath return data @classmethod def has_tree_position(cls): return False @classmethod
[docs] def iter_tree_nodes(cls, parent, order_by=None): """ Generator that loops over the subnodes of a given parent. :param str parent: Iterate over this node's children """ def _iter_cursor(cursor): for file_object in sorted((d for d in cursor), key=lambda d: (d.filename.lower(), d.filename)): if file_object.filename[0] != '.': if os.path.exists(file_object.abspath): yield file_object else: # delete db reference to inexistent file if os.path.exists(file_object.abspath): file_object.delete() query = cls.objects(path=parent and parent.relpath or '', isdir=True) yield from _iter_cursor(query)
def is_possible_parent(self): return self.isdir
[docs] def move_to(self, parent, pos=None): """ Move the specified node to a new parent. TODO: This deletes any existing file in the target directory with the same name. This should be configurable. :param xmm.models.File parent: Parent node :param int pos: Ignored for files here. """ try: target = File.objects.get(path=parent.relpath if parent else '', filename=self.filename) except DoesNotExist: pass else: target.delete() self.rename(parent.relpath if parent else '', self.filename)
def stream(self): with open(self.abspath, 'rb') as f: while True: data = f.read(4 * 1024) if data: yield data else: break @lru_cache()
[docs] def get_meta_data(self): """ Read the EXIF data from the file. :rtype: list Example of a result:: [{ 'category': 'category1', 'tags': [{ 'tag': 'tag1', 'value': 'value1' }, { 'tag': 'tag2', 'value': 'value2' }] }, { 'category': 'category2', 'tags': [{ 'tag': 'tag1', 'value': 'value1' }] }] """ if self.is_image: with open(self.abspath, 'rb') as f: tags = exifread.process_file(f) categories = OrderedDict() gps_info = {} for tag, value in sorted(tags.items()): if ' ' not in tag: # ignore tags without category (e.g. JPEGThumbnail or TIFFThumbnail) continue category, tag = tag.split(' ', 1) if category in ('EXIF', 'GPS', 'Image', 'Interoperability', 'MakerNote'): if tag.startswith('Tag 0x'): # ignore unknown tags continue if category not in categories: categories[category] = [] if tag in [ 'GPSLatitude', 'GPSLatitudeRef', 'GPSLongitude', 'GPSLongitudeRef', ]: gps_info[tag] = value.values categories[category].append({ 'tag': tag, 'value': str(value), }) if flask.current_app.config.get('MAPS_LINK') and len(gps_info) == 4: def coords(g, ref): return '{deg}°{min}\'{sec}"{ref}'.format( deg=g[0], min=g[1], sec=g[2].num / g[2].den, ref=ref ) lat = coords(gps_info['GPSLatitude'], gps_info['GPSLatitudeRef']) lon = coords(gps_info['GPSLongitude'], gps_info['GPSLongitudeRef']) coordinates = '{} {}'.format(lat, lon) categories['GPS'].insert(0, { 'tag': 'Show on {}'.format(flask.current_app.config.get('MAPS_NAME')), 'value': coordinates, 'link': flask.current_app.config.get('MAPS_LINK').format(coordinates), }) return [{ 'category': category, 'tags': categories[category], } for category in categories] return []
@api_method def api_get_meta_data(self): return { 'result': self.get_meta_data() } @property def is_image(self): return self.extension in flask.current_app.config['THUMBNAIL_EXTENSIONS'] @property def thumbnailable(self): return not self.isdir and self.is_image @lru_cache()
[docs] def image_dimensions(self): """Get the image pixel size as a (width, height) tuple.""" assert self.is_image, 'Not an image.' with Image.open(self.abspath) as im: size = ImageSize(*im.size) return size
def thumbnail_url(self, size=None): if self.missing or not self.thumbnailable: return None if size is None: size = (200, 150) if flask.current_app.config['THUMBOR_URL'] is not None: return self._thumbor_url(size) return self._thumbnail_url(size) def _thumbor_url(self, size): try: from libthumbor import CryptoURL except ImportError: print('To use Thumbor thumbnails, please install `libthumbor`!', file=sys.stderr) raise thumbor_key = flask.current_app.config['THUMBOR_KEY'] crypto = CryptoURL(key=thumbor_key) url = crypto.generate( unsafe=(thumbor_key == 'unsafe'), width=size[0], height=size[1], smart=True, image_url=self.relpath, ) thumbor_url = flask.current_app.config['THUMBOR_URL'].strip('/') + '/' return thumbor_url + url def _thumbnail_url(self, size): return flask.url_for( 'main.thumbnail', object_id=self.id, size='x'.join(map(str, size)), _external=True ) def stream_thumbnail(self, size=None, at=1.0): if self.missing: flask.current_app.logger.warning(f'stream_thumbnail() failed: file missing: {self.id}') return [], '' if size is None: size = (200, 150) if not self.thumbnailable: flask.current_app.logger.warning(f'stream_thumbnail() failed: not thumbnailable: {self.id}') return [], 'image/gif' try: if str(at) != '1.0': f = float(at) else: f = 1 size = (int(f * size[0]), int(f * size[1])) path = self.get_thumbnail_path(size) mimetype = self.mimetype if str(at) != '1.0': path, ext = os.path.splitext(path) path = '{}@{}x{}'.format(path, str(at), ext) if self.extension in flask.current_app.config['THUMBNAIL_MAPPING']: path = '{}{}'.format(os.path.splitext(path)[0], flask.current_app.config['THUMBNAIL_MAPPING'][self.extension]) if os.path.exists(path) and os.path.getmtime(path) >= os.path.getmtime(self.abspath): return open(path, 'rb'), mimetype if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) if flask.current_app.config['THUMBNAIL_EXTENSIONS'][self.extension] == 'pil': img = Image.open(self.abspath) img.thumbnail(size, Image.ANTIALIAS) img.save(path, quality=100) elif flask.current_app.config['THUMBNAIL_EXTENSIONS'][self.extension] == 'gm': args = [flask.current_app.config['BINARIES']['gm'], 'convert', '-density', '300', '-resize', '{}x{}>'.format(*size), '-colorspace', 'RGB', '-background', 'white', '-quality', '100', '{}[0]'.format(self.abspath), path] flask.current_app.logger.debug(' '.join(args)) subprocess.check_call(args) mimetype = 'image/png' return open(path, 'rb'), mimetype except: flask.current_app.logger.exception('') return [], 'image/gif' def has_children(self): return self.isdir and self.has_subdirectories() def _find_clone_name(self, in_path, count=0): """Find a name for a clone that isn't already being used.""" clone_name = self.basename if count > 0: clone_name += _(' (Kopie %(count)d)', count=count) + self.extension else: clone_name += self.extension if self.__class__.objects(path=in_path, filename=clone_name).count() == 0: return clone_name return self._find_clone_name(in_path, count + 1)
[docs] def clone(self, **kwargs): """ Copy this file or folder. :keyword str path: New path where the copy should be saved. :return: A clone of this File. :rtype: xmm.models.File """ if 'parentId' in kwargs: if kwargs['parentId']: parent_path = File.objects.get(id=kwargs['parentId']).get_path() else: # moved to root parent_path = '' else: parent_path = self.path new_path = self.get_path(directory=parent_path) new_name = self._find_clone_name(parent_path) f = File(path=parent_path, filename=new_name) f.save() try: if self.isdir: shutil.copytree(self.abspath, os.path.join(new_path, new_name)) else: shutil.copy2(self.abspath, os.path.join(new_path, new_name)) finally: f.refresh_fs()