import grp
import mimetypes
import os
import pwd
import shutil
import stat
import subprocess
import sys
from collections import namedtuple, OrderedDict
from datetime import datetime
from functools import lru_cache
import exifread
import flask
from flask_babel import lazy_gettext as _
from mongoengine import DoesNotExist, NotUniqueError
from PIL import Image
from xmm.core.permissions import media_need, media_permission
from xmm.models import MetaFieldsMixin
from xmm.models.task import HotfolderTaskTrigger
from xmm.modules.references import find_cross_references
from xmm.util.flask_helpers import api_method
from . import BaseDocument, fields, SignalizedMetaclass, Task, TreeModelMixin
ImageSize = namedtuple('ImageSize', ['width', 'height'])
[docs]class File(MetaFieldsMixin, TreeModelMixin, BaseDocument, metaclass=SignalizedMetaclass):
_elastic_index = 'media'
_elastic_doc_type = 'file'
#: Path to file.
path = fields.StringField(required=True)
#: Is file deleted from storage?
missing = fields.BooleanField(default=False)
#: Actual filename including file extension.
filename = fields.StringField(label=_('Dateiname'), required=True, flexGrow=2, width=400, isSortable=True)
#: Size of file in bytes.
size = fields.IntField(label=_('Grösse'), datatype='filesize', required=False, width=120, isSortable=True)
#: Mimetype of file.
mimetype = fields.StringField(required=False)
#: Is it a directory?
isdir = fields.BooleanField(required=True, default=False)
#: Title of the file.
title = fields.MultilingualField(fields.StringField())
#: Text that makes sense to the user.
description = fields.MultilingualField(fields.TextField())
#: Alternative text shown if frontend can't load image.
alt_text = fields.MultilingualField(fields.StringField())
#: Is it hidden? Defaults to True for file names starting with a period "."
hidden = fields.BooleanField(required=True, default=False)
#: Created time.
ctime = fields.DatetimeField(default=datetime.now, required=True)
#: Modified time.
mtime = fields.DatetimeField(default=datetime.now, required=True)
meta = {
'verbose_name': _('Datei'),
'verbose_name_plural': _('Dateien'),
'indexes': [{'fields': ('path', 'filename'), 'unique': True},
{'fields': ('path', 'isdir')},
'filename', 'hidden', 'tags'],
'collection': 'media_file'
}
can_be_referenced = True
dict_fields = [
'changed_dt',
'filename',
'size',
'title',
'description',
'alt_text',
'path',
'isdir',
'relpath',
'owner',
'is_image',
'permissions',
]
columns_definitions = [
('thumbnail', fields.MediaField(label=_(' '), cellprops={'draggable': True},
columnKeyIn=[], width=43)),
'filename',
'tags',
'changed_dt',
'size',
]
virtual_fields = {
'relpath': fields.StringField(),
'abspath': fields.StringField(),
}
@classmethod
def get_module_need(cls):
return media_need
@classmethod
def get_module_permission(cls):
return media_permission
def __str__(self):
return self.filename
def to_api_json(self, **options):
data = super().to_api_json(**options)
data.update({
'id': self.id,
'filename': self.filename,
'title': self.title,
'description': self.description,
'alt_text': self.alt_text,
'path': self.relpath,
'mtime': self.mtime,
'ctime': self.ctime,
'atime': self.atime,
})
return data
@classmethod
def transform_search_query(cls, query):
query = super().transform_search_query(query)
# never show hidden files
query['query'].setdefault('bool', {})
query['query']['bool'].setdefault('must', [])
query['query']['bool']['must'] = query['query']['bool'].get('must', []) + [{
'match': {
'hidden': False
}
}]
if '__isdir__' in query:
# query only dirs/files
if query['__isdir__'] is not None:
query['query']['bool']['must'].append({
'term': {
'isdir': bool(query['__isdir__'])
}
})
del query['__isdir__']
if '__path__' in query:
# add path prefix query
try:
folder = File.objects.get(id=query['__path__'], isdir=True)
except DoesNotExist:
folder = None
query['query']['bool']['must'].append({
'prefix': {
'path.raw': folder.relpath.lower() if folder else ''
}
})
del query['__path__']
if '__type__' in query:
# add MIME filetype query
query['query']['bool']['must'].append({
'match': {
'mimetype.raw': query.pop('__type__'),
},
})
if '__parent__' in query:
# add parent match query
try:
parent = File.objects.get(id=query['__parent__'], isdir=True)
except DoesNotExist:
parent = None
query['query']['bool']['must'].append({
'match': {
'path.raw': parent.relpath if parent else ''
}
})
del query['__parent__']
return query
@classmethod
[docs] def build_elastic_query(cls, term, multilingual=True):
"""
Build a search query for a term.
:param str term: The search term.
:param bool multilingual: Set to False to search only the current language.
"""
def match_id(term):
return {
'term': {
'_id': term,
},
}
def match_phrase(term, fields):
return {
'multi_match': {
'query': term,
'type': 'cross_fields',
'fields': fields,
'operator': 'and',
},
}
min_match_query = {
'bool': {
'minimum_should_match': 1,
'should': [],
}
}
queries = {
'must': [],
'must_not': [],
'should': [],
}
term = term.strip('/')
if '/' in term:
folder, term = term.rsplit('/', 1)
queries['must'].append(match_phrase(folder, ['path.partial_front^2', 'path.full^4']))
min_match_query['bool']['should'].append(match_id(term))
es_fields = [
'path.partial_front^2',
'path.full^4',
'filename.partial_front^2.5',
'filename.full^6',
'filename.partial_back',
'tags.partial_front^0.5',
'tags.full',
'title.{}.full^0.5'.format(flask.g.lang_code),
'title.{}.partial_front^0.1'.format(flask.g.lang_code),
'description.{}.full^0.5'.format(flask.g.lang_code),
'description.{}.partial_front^0.1'.format(flask.g.lang_code),
'alt_text.{}.full^0.5'.format(flask.g.lang_code),
'alt_text.{}.partial_front^0.1'.format(flask.g.lang_code),
]
min_match_query['bool']['should'].append(match_phrase(term, es_fields))
queries['must'].append(min_match_query)
return queries
@classmethod
def pre_save(cls, sender, document, **kwargs):
document.hidden = document.filename.startswith('.')
if document.path == '.':
document.path = ''
return BaseDocument.pre_save(sender, document, **kwargs)
@classmethod
def post_save(cls, sender, document, created=False, **kwargs):
from xmm.models.cms.base import clear_cms_cache
clear_cms_cache()
if created and not document.isdir:
for task in Task.objects:
for trigger in task.filter_triggers(HotfolderTaskTrigger.TRIGGER_TYPE):
if trigger.match(document.relpath):
task.execute(trigger, path=document.abspath)
return super().post_save(sender, document, **kwargs)
@classmethod
def verify_deletions(cls, objects):
for file in objects:
refs = find_cross_references(cls, file.id)
if len(refs):
msg = _('Datei "%(f)s" kann nicht gelöscht werden, da sie noch referenziert wird.', f=file.filename)
flask.flash(msg, 'warning')
return False
return True
@classmethod
def post_delete(cls, sender, document, **kwargs):
if not kwargs.get('already_removed'):
try:
document._log_deletion()
if document.isdir:
shutil.rmtree(document.abspath)
else:
os.remove(document.abspath)
except:
flask.current_app.logger.exception(
'Delete file with ID {}. Deletion failed! Path is: {!r}'.format(document.id, document.abspath)
)
return super().post_delete(sender, document, **kwargs)
def _log_deletion(self):
"""
Log deletion of a file with its ID and path.
Does this recursively for all children of directories.
"""
obj_type = 'directory' if self.isdir else 'file'
for child in self.get_children():
child._log_deletion()
flask.current_app.logger.info(
'Delete {} with ID {}. Path was: {!r}'.format(obj_type, self.id, self.abspath)
)
@classmethod
def create_or_update(cls, path, filename):
try:
f = File.objects.get(path=path, filename=filename)
f.refresh_fs(force_update=False)
except DoesNotExist:
f = File()
f.path = path
f.filename = filename
f.refresh_fs()
return f
@classmethod
def create_from_json(cls, json):
parent = None
if 'parentId' in json:
parent = File.objects.get(id=json['parentId']) if json['parentId'] else None
del json['parentId']
obj = File()
obj.path = parent.relpath if parent else ''
obj.isdir = True
try:
obj.create(json['filename'])
obj.refresh_fs()
except NotUniqueError:
flask.flash(
_('Der Ordnername \'%(filename)s\' wurde bereits vergeben.', filename=json['filename']),
'error')
return None
except:
flask.current_app.logger.exception('')
flask.flash(
_('Der Ordner \'%(filename)s\' konnte nicht gespeichert werden.', filename=json['filename']),
'error')
return None
del json['filename']
return obj.update_from_json(json)
@classmethod
[docs] def find_referencing_models(cls):
"""Find models that should be searched for references of this class."""
from .attribute import Attribute
for attr in Attribute.objects(field_class='xmm.models.fields.file.FileField'):
yield from [(model, 'fields.' + attr.key) for model in attr.find_dynamic_usages()]
def update_from_json(self, json, compare_timestamps=True):
if 'filename' in json:
if json['filename'] != self.filename:
try:
self.rename(json['filename'])
self.save()
self.refresh_fs()
except NotUniqueError:
if self.isdir:
msg = 'Der Ordnername \'%(filename)s\' wurde bereits vergeben.'
else:
msg = 'Der Dateiname \'%(filename)s\' wurde bereits vergeben.'
flask.flash(_(msg, filename=json['filename']), 'error')
return None
except Exception as e:
if self.isdir:
msg = 'Der Ordner \'%(filename)s\' konnte nicht gespeichert werden.'
else:
msg = 'Die Datei \'%(filename)s\' konnte nicht gespeichert werden.'
flask.flash(_(msg, filename=json['filename']), 'error')
return None
del json['filename']
# Timestamp on files must be ignored for optimistic locking.
return super().update_from_json(json, compare_timestamps=False)
@classmethod
def _get_file_by_realpath(cls, attr, searchpath):
"""Find a file by its full absolute or relative path."""
if attr == 'abspath':
searchpath = os.path.relpath(searchpath, flask.current_app.config['MEDIA_ROOT'])
path, filename = os.path.split(searchpath)
try:
return cls.objects.get(path=path, filename=filename)
except DoesNotExist:
flask.current_app.logger.warning(f'File does not exist: {searchpath}')
return None
@classmethod
def _get_file_list_by_realpath(cls, attr, searchpaths):
"""Find a number of files by their absolute or relative paths."""
for searchpath in searchpaths:
file = cls._get_file_by_realpath(attr, searchpath)
if file:
yield file
@classmethod
[docs] def from_field_map(cls, field_map, dataset, is_list=False):
"""Find an instance by looking up fields in a referencing dataset."""
err_query = '`attribute_map` for files must be either {relpath}, {abspath} or {path, filename}.'
err_list = 'Cannot use path/filename attribute_map for multivalue attributes! Use relpath or abspath instead!'
# We cannot use a combination of these mappings.
query_keys = set(field_map.keys())
assert query_keys in [{'relpath'}, {'abspath'}, {'path', 'filename'}], err_query
assert not is_list or query_keys != {'path', 'filename'}, err_list
if query_keys in [{'relpath'}, {'abspath'}]:
map_attr = list(query_keys)[0]
getter = field_map[map_attr]
value = getter(dataset)
if not value:
return None
if is_list:
if not isinstance(value, (list, tuple)):
value = [value]
return list(cls._get_file_list_by_realpath(map_attr, value))
return cls._get_file_by_realpath(map_attr, value)
try:
q = {}
for key, getter in field_map.items():
value = getter(dataset)
q[key] = value
return cls.objects.get(**q)
except DoesNotExist:
flask.current_app.logger.warning(f'File does not exist: {q!r}')
return None
def refresh_fs(self, force_update=True):
abspath = self.abspath
if os.path.exists(abspath):
self.ctime = datetime.fromtimestamp(os.path.getctime(abspath)).replace(microsecond=0)
self.mtime = datetime.fromtimestamp(os.path.getmtime(abspath)).replace(microsecond=0)
if os.path.isdir(abspath):
self.isdir = True
self.size = None
self.mimetype = None
if force_update or self._changed_fields:
self.save()
elif os.path.isfile(abspath):
self.isdir = False
self.size = os.path.getsize(abspath)
self.mimetype, encoding = mimetypes.guess_type(abspath)
if self.mimetype is None:
self.mimetype = 'application/octet-stream'
if force_update or self._changed_fields:
self.save()
elif not os.path.exists(abspath):
if self.id is not None:
self.delete()
def rename(self, newpath, newname=None, done=False):
if newname is None:
newname = newpath
newpath = self.path
old_path = self.abspath
old_relpath = self.relpath
self.path = newpath
self.filename = newname
new_path = self.abspath
new_relpath = self.relpath
self.save()
if self.isdir:
for f in File.objects(path__startswith=old_relpath):
f.path = new_relpath + f.path[len(old_relpath):]
f.save()
if not done:
shutil.move(old_path, new_path)
def create(self, newname, done=False):
self.filename = newname
new_path = self.abspath
f = None
if not done:
if self.isdir:
if os.path.exists(new_path):
raise NotUniqueError('Dir already exists!')
self.save()
f = File.objects.get(path=self.path, filename=self.filename)
try:
os.makedirs(new_path)
except:
self.delete()
raise
else:
if os.path.exists(new_path):
raise NotUniqueError('File already exists!')
self.save()
try:
fd = os.open(new_path,
os.O_WRONLY | os.O_CREAT | os.O_EXCL)
f = os.fdopen(fd, 'wb')
except:
self.delete()
raise
return f
def view_size(self):
if self.isdir:
return ''
factor = 1024 * 1024 * 1024
if self.size >= factor:
return '{:.1f} GB'.format(float(self.size) / factor)
factor = 1024 * 1024
if self.size >= factor:
return '{:.1f} MB'.format(float(self.size) / factor)
factor = 1024
if self.size >= factor:
return '{:.1f} kB'.format(float(self.size) / factor)
return '{} B'.format(self.size)
def get_path(self, absolute=False, directory=None):
if directory is not None:
return os.path.join(flask.current_app.config['MEDIA_ROOT'], directory)
if absolute:
return os.path.join(
flask.current_app.config['MEDIA_ROOT'],
self.path, self.filename)
return os.path.join(self.path, self.filename)
@property
def relpath(self):
return self.get_path()
@property
def abspath(self):
return self.get_path(True)
@property
def extension_short(self):
if self.isdir:
return ''
if '.' in self.filename:
return self.filename.split('.').pop().lower()
return ''
@property
def extension(self):
if self.isdir:
return ''
return self.get_filename_parts()[1].lower()
@property
def basename(self):
"""The filename without extension."""
if self.isdir:
return self.filename
return self.get_filename_parts()[0]
def get_filename_parts(self):
return os.path.splitext(self.filename)
def _set_missing(self):
if not self.missing:
self.missing = True
self.save()
@property
def owner(self):
path = self.abspath
try:
user = pwd.getpwuid(os.stat(path).st_uid).pw_name
except KeyError:
user = os.stat(path).st_uid
except FileNotFoundError:
self._set_missing()
user = None
try:
group = grp.getgrgid(os.stat(path).st_gid).gr_name
except KeyError:
group = os.stat(path).st_gid
except FileNotFoundError:
self._set_missing()
return None
return '{}.{}'.format(user, group)
@property
def permissions(self):
try:
return stat.filemode(os.stat(self.abspath).st_mode)
except FileNotFoundError:
self._set_missing()
return None
@property
def atime(self):
try:
return datetime.fromtimestamp(os.path.getatime(self.abspath)).replace(microsecond=0)
except FileNotFoundError:
self._set_missing()
return None
[docs] def get_thumbnail_path(self, size):
"""Get absolute path to the thumbnail."""
dir = os.path.join(
flask.current_app.config['STATIC_ROOT'],
'thumbnails',
'{}x{}'.format(*size))
os.makedirs(dir, 0o775, exist_ok=True)
thumb_filename = '{}{}'.format(str(self.id), self.extension)
return os.path.join(dir, thumb_filename)
[docs] def clear_thumbnails(self):
"""Remove all cached thumbnails."""
import glob
thumb_filename = '{}*{}'.format(str(self.id), self.extension)
files = glob.glob(os.path.join(
flask.current_app.config['STATIC_ROOT'],
'thumbnails', '**', thumb_filename,
))
for path in files:
os.remove(path)
def get_subdirectories(self):
if not self.isdir:
# Directories don't have subdirectories, eh.
return []
return File.objects(path=self.relpath, isdir=True).order_by('filename')
def get_children(self):
if not self.isdir:
# Directories don't have children, eh.
return []
return File.objects(path=self.relpath).order_by('filename')
def has_subdirectories(self):
return bool(File.objects(path=self.relpath, isdir=True).count())
def get_parent(self):
if self.path:
path, filename = os.path.split(self.path)
try:
return File.objects.get(path=path, filename=filename)
except File.DoesNotExist:
pass
return None
def get_tree_data(self, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self.get_parent().id
data = super().get_tree_data(**kwargs)
data['relpath'] = self.relpath
return data
@classmethod
def has_tree_position(cls):
return False
@classmethod
[docs] def iter_tree_nodes(cls, parent, order_by=None):
"""
Generator that loops over the subnodes of a given parent.
:param str parent: Iterate over this node's children
"""
def _iter_cursor(cursor):
for file_object in sorted((d for d in cursor), key=lambda d: (d.filename.lower(), d.filename)):
if file_object.filename[0] != '.':
if os.path.exists(file_object.abspath):
yield file_object
else:
# delete db reference to inexistent file
if os.path.exists(file_object.abspath):
file_object.delete()
query = cls.objects(path=parent and parent.relpath or '', isdir=True)
yield from _iter_cursor(query)
def is_possible_parent(self):
return self.isdir
[docs] def move_to(self, parent, pos=None):
"""
Move the specified node to a new parent.
TODO: This deletes any existing file in the
target directory with the same name. This
should be configurable.
:param xmm.models.File parent: Parent node
:param int pos: Ignored for files here.
"""
try:
target = File.objects.get(path=parent.relpath if parent else '', filename=self.filename)
except DoesNotExist:
pass
else:
target.delete()
self.rename(parent.relpath if parent else '', self.filename)
def stream(self):
with open(self.abspath, 'rb') as f:
while True:
data = f.read(4 * 1024)
if data:
yield data
else:
break
@lru_cache()
@api_method
def api_get_meta_data(self):
return {
'result': self.get_meta_data()
}
@property
def is_image(self):
return self.extension in flask.current_app.config['THUMBNAIL_EXTENSIONS']
@property
def thumbnailable(self):
return not self.isdir and self.is_image
@lru_cache()
[docs] def image_dimensions(self):
"""Get the image pixel size as a (width, height) tuple."""
assert self.is_image, 'Not an image.'
with Image.open(self.abspath) as im:
size = ImageSize(*im.size)
return size
def thumbnail_url(self, size=None):
if self.missing or not self.thumbnailable:
return None
if size is None:
size = (200, 150)
if flask.current_app.config['THUMBOR_URL'] is not None:
return self._thumbor_url(size)
return self._thumbnail_url(size)
def _thumbor_url(self, size):
try:
from libthumbor import CryptoURL
except ImportError:
print('To use Thumbor thumbnails, please install `libthumbor`!', file=sys.stderr)
raise
thumbor_key = flask.current_app.config['THUMBOR_KEY']
crypto = CryptoURL(key=thumbor_key)
url = crypto.generate(
unsafe=(thumbor_key == 'unsafe'),
width=size[0],
height=size[1],
smart=True,
image_url=self.relpath,
)
thumbor_url = flask.current_app.config['THUMBOR_URL'].strip('/') + '/'
return thumbor_url + url
def _thumbnail_url(self, size):
return flask.url_for(
'main.thumbnail',
object_id=self.id,
size='x'.join(map(str, size)),
_external=True
)
def stream_thumbnail(self, size=None, at=1.0):
if self.missing:
flask.current_app.logger.warning(f'stream_thumbnail() failed: file missing: {self.id}')
return [], ''
if size is None:
size = (200, 150)
if not self.thumbnailable:
flask.current_app.logger.warning(f'stream_thumbnail() failed: not thumbnailable: {self.id}')
return [], 'image/gif'
try:
if str(at) != '1.0':
f = float(at)
else:
f = 1
size = (int(f * size[0]), int(f * size[1]))
path = self.get_thumbnail_path(size)
mimetype = self.mimetype
if str(at) != '1.0':
path, ext = os.path.splitext(path)
path = '{}@{}x{}'.format(path, str(at), ext)
if self.extension in flask.current_app.config['THUMBNAIL_MAPPING']:
path = '{}{}'.format(os.path.splitext(path)[0],
flask.current_app.config['THUMBNAIL_MAPPING'][self.extension])
if os.path.exists(path) and os.path.getmtime(path) >= os.path.getmtime(self.abspath):
return open(path, 'rb'), mimetype
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if flask.current_app.config['THUMBNAIL_EXTENSIONS'][self.extension] == 'pil':
img = Image.open(self.abspath)
img.thumbnail(size, Image.ANTIALIAS)
img.save(path, quality=100)
elif flask.current_app.config['THUMBNAIL_EXTENSIONS'][self.extension] == 'gm':
args = [flask.current_app.config['BINARIES']['gm'], 'convert',
'-density', '300',
'-resize', '{}x{}>'.format(*size),
'-colorspace', 'RGB',
'-background', 'white',
'-quality', '100',
'{}[0]'.format(self.abspath), path]
flask.current_app.logger.debug(' '.join(args))
subprocess.check_call(args)
mimetype = 'image/png'
return open(path, 'rb'), mimetype
except:
flask.current_app.logger.exception('')
return [], 'image/gif'
def has_children(self):
return self.isdir and self.has_subdirectories()
def _find_clone_name(self, in_path, count=0):
"""Find a name for a clone that isn't already being used."""
clone_name = self.basename
if count > 0:
clone_name += _(' (Kopie %(count)d)', count=count) + self.extension
else:
clone_name += self.extension
if self.__class__.objects(path=in_path, filename=clone_name).count() == 0:
return clone_name
return self._find_clone_name(in_path, count + 1)
[docs] def clone(self, **kwargs):
"""
Copy this file or folder.
:keyword str path: New path where the copy should be saved.
:return: A clone of this File.
:rtype: xmm.models.File
"""
if 'parentId' in kwargs:
if kwargs['parentId']:
parent_path = File.objects.get(id=kwargs['parentId']).get_path()
else:
# moved to root
parent_path = ''
else:
parent_path = self.path
new_path = self.get_path(directory=parent_path)
new_name = self._find_clone_name(parent_path)
f = File(path=parent_path, filename=new_name)
f.save()
try:
if self.isdir:
shutil.copytree(self.abspath, os.path.join(new_path, new_name))
else:
shutil.copy2(self.abspath, os.path.join(new_path, new_name))
finally:
f.refresh_fs()