Source code for flask_superadmin.contrib.fileadmin

import os
import os.path as op
import platform
import urlparse
import re
import shutil

from operator import itemgetter

from werkzeug import secure_filename

from flask import flash, url_for, redirect, abort, request

from flask_superadmin.base import BaseView, expose
from flask_superadmin.babel import gettext, lazy_gettext
from flask_superadmin import form
from flask_wtf.file import FileField
from wtforms import TextField, ValidationError

class NameForm(form.BaseForm):
        Form with a filename input field.

        Validates if provided name is valid for *nix and Windows systems.
    name = TextField()

    regexp = re.compile(r'^(?!^(PRN|AUX|CLOCK\$|NUL|CON|COM\d|LPT\d|\..*)'

    def validate_name(self, field):
        if not self.regexp.match(
            raise ValidationError(gettext('Invalid directory name'))

class UploadForm(form.BaseForm):
        File upload form. Works with FileAdmin instance to check if it
        is allowed to upload file with given extension.
    upload = FileField(lazy_gettext('File to upload'))

    def __init__(self, admin):
        self.admin = admin

        super(UploadForm, self).__init__()

    def validate_upload(self, field):
        if not self.upload.has_file():
            raise ValidationError(gettext('File required.'))

        filename =

        if not self.admin.is_file_allowed(filename):
            raise ValidationError(gettext('Invalid file type.'))

[docs]class FileAdmin(BaseView): """ Simple file-management interface. Requires two parameters: `path` Path to the directory which will be managed `url` Base URL for the directory. Will be used to generate static links to the files. Sample usage:: admin = Admin() path = op.join(op.dirname(__file__), 'static') admin.add_view(FileAdmin(path, '/static/', name='Static Files')) admin.setup_app(app) """ can_upload = True """ Is file upload allowed. """ can_delete = True """ Is file deletion allowed. """ can_delete_dirs = True """ Is recursive directory deletion is allowed. """ can_mkdir = True """ Is directory creation allowed. """ can_rename = True """ Is file and directory renaming allowed. """ allowed_extensions = None """ List of allowed extensions for uploads, in lower case. Example:: class MyAdmin(FileAdmin): allowed_extensions = ('swf', 'jpg', 'gif', 'png') """ list_template = 'admin/file/list.html' """ File list template """ upload_template = 'admin/file/form.html' """ File upload template """ mkdir_template = 'admin/file/form.html' """ Directory creation (mkdir) template """ rename_template = 'admin/file/rename.html' """ Rename template """ def __init__(self, base_path, base_url, name=None, category=None, endpoint=None, url=None): """ Constructor. `base_path` Base file storage location `base_url` Base URL for the files `name` Name of this view. If not provided, will be defaulted to the class name. `category` View category `endpoint` Endpoint name for the view `url` URL for view """ self.base_path = base_path self.base_url = base_url self._on_windows = platform.system() == 'Windows' # Convert allowed_extensions to set for quick validation if (self.allowed_extensions and not isinstance(self.allowed_extensions, set)): self.allowed_extensions = set(self.allowed_extensions) super(FileAdmin, self).__init__(name, category, endpoint, url)
[docs] def is_accessible_path(self, path): """ Verify if path is accessible for current user. Override to customize behavior. `path` Relative path to the root """ return True
[docs] def get_base_path(self): """ Return base path. Override to customize behavior (per-user directories, etc) """ return op.normpath(self.base_path)
[docs] def get_base_url(self): """ Return base URL. Override to customize behavior (per-user directories, etc) """ return self.base_url
[docs] def is_file_allowed(self, filename): """ Verify if file can be uploaded. Override to customize behavior. `filename` Source file name """ ext = op.splitext(filename)[1].lower() if ext.startswith('.'): ext = ext[1:] if self.allowed_extensions and ext not in self.allowed_extensions: return False return True
[docs] def is_in_folder(self, base_path, directory): """ Verify if `directory` is in `base_path` folder """ return op.normpath(directory).startswith(base_path)
[docs] def save_file(self, path, file_data): """ Save uploaded file to the disk `path` Path to save to `file_data` Werkzeug `FileStorage` object """
def _get_dir_url(self, endpoint, path, **kwargs): """ Return prettified URL `endpoint` Endpoint name `path` Directory path `kwargs` Additional arguments """ if not path: return url_for(endpoint) else: if self._on_windows: path = path.replace('\\', '/') kwargs['path'] = path return url_for(endpoint, **kwargs) def _get_file_url(self, path): """ Return static file url `path` Static file path """ base_url = self.get_base_url() return urlparse.urljoin(base_url, path) def _normalize_path(self, path): """ Verify and normalize path. If path is not relative to the base directory, will throw 404 exception. If path does not exist, will also throw 404 exception. """ base_path = self.get_base_path() if path is None: directory = base_path path = '' else: path = op.normpath(path) directory = op.normpath(op.join(base_path, path)) if not self.is_in_folder(base_path, directory): abort(404) if not op.exists(directory): abort(404) return base_path, directory, path def field_name(self, text): return text.capitalize() def get_readonly_fields(self, instance): return {} @expose('/') @expose('/b/<path:path>')
[docs] def index(self, path=None): """ Index view method `path` Optional directory path. If not provided, will use base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) # Get directory listing items = [] # Parent directory if directory != base_path: parent_path = op.normpath(op.join(path, '..')) if parent_path == '.': parent_path = None items.append(('..', parent_path, True, 0)) for f in os.listdir(directory): fp = op.join(directory, f) items.append((f, op.join(path, f), op.isdir(fp), op.getsize(fp))) # Sort by type items.sort(key=itemgetter(2), reverse=True) # Generate breadcrumbs accumulator = [] breadcrumbs = [] for n in path.split(os.sep): accumulator.append(n) breadcrumbs.append((n, op.join(*accumulator))) return self.render(self.list_template, dir_path=path, breadcrumbs=breadcrumbs, get_dir_url=self._get_dir_url, get_file_url=self._get_file_url, items=items, base_path=base_path)
@expose('/upload/', methods=('GET', 'POST')) @expose('/upload/<path:path>', methods=('GET', 'POST'))
[docs] def upload(self, path=None): """ Upload view method `path` Optional directory path. If not provided, will use base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) if not self.can_upload: flash(gettext('File uploading is disabled.'), 'error') return redirect(self._get_dir_url('.index', path)) form = UploadForm(self) if form.validate_on_submit(): filename = op.join(directory, secure_filename( if op.exists(filename): flash(gettext('File "%(name)s" already exists.',, 'error') else: try: self.save_file(filename, return redirect(self._get_dir_url('.index', path)) except Exception, ex: flash(gettext('Failed to save file: %(error)s', error=ex)) return self.render(self.upload_template, form=form, base_path=base_path, path=path, msg=gettext(u'Upload a file'))
@expose('/mkdir/', methods=('GET', 'POST')) @expose('/mkdir/<path:path>', methods=('GET', 'POST'))
[docs] def mkdir(self, path=None): """ Directory creation view method `path` Optional directory path. If not provided, will use base directory """ # Get path and verify if it is valid base_path, directory, path = self._normalize_path(path) dir_url = self._get_dir_url('.index', path) if not self.can_mkdir: flash(gettext('Directory creation is disabled.'), 'error') return redirect(dir_url) form = NameForm(request.form) if form.validate_on_submit(): try: os.mkdir(op.join(directory, return redirect(dir_url) except Exception, ex: flash(gettext('Failed to create directory: %(error)s', ex), 'error') return self.render(self.mkdir_template, form=form, dir_url=dir_url, base_path=base_path, path=path, msg=gettext(u'Create a new directory'))
@expose('/delete/', methods=('POST',))
[docs] def delete(self): """ Delete view method """ path = request.form.get('path') if not path: return redirect(url_for('.index')) # Get path and verify if it is valid base_path, full_path, path = self._normalize_path(path) return_url = self._get_dir_url('.index', op.dirname(path)) if not self.can_delete: flash(gettext('Deletion is disabled.')) return redirect(return_url) if op.isdir(full_path): if not self.can_delete_dirs: flash(gettext('Directory deletion is disabled.')) return redirect(return_url) try: shutil.rmtree(full_path) flash( gettext('Directory "%s" was successfully deleted.' % path) ) except Exception, ex: flash( gettext('Failed to delete directory: %(error)s', error=ex), 'error' ) else: try: os.remove(full_path) flash(gettext('File "%(name)s" was successfully deleted.', name=path)) except Exception, ex: flash(gettext('Failed to delete file: %(name)s', name=ex), 'error') return redirect(return_url)
@expose('/rename/', methods=('GET', 'POST'))
[docs] def rename(self): """ Rename view method """ path = request.args.get('path') if not path: return redirect(url_for('.index')) base_path, full_path, path = self._normalize_path(path) return_url = self._get_dir_url('.index', op.dirname(path)) if not self.can_rename: flash(gettext('Renaming is disabled.')) return redirect(return_url) if not op.exists(full_path): flash(gettext('Path does not exist.')) return redirect(return_url) form = NameForm(request.form, name=op.basename(path)) if form.validate_on_submit(): try: dir_base = op.dirname(full_path) filename = secure_filename( os.rename(full_path, op.join(dir_base, filename)) flash(gettext('Successfully renamed "%(src)s" to "%(dst)s"', src=op.basename(path), dst=filename)) except Exception, ex: flash(gettext('Failed to rename: %(error)s', error=ex), 'error') return redirect(return_url) return self.render(self.rename_template, form=form, path=op.dirname(path), name=op.basename(path), dir_url=return_url, base_path=base_path)