diff options
Diffstat (limited to 'src/leap/soledad/server/_blobs/fs_backend.py')
-rw-r--r-- | src/leap/soledad/server/_blobs/fs_backend.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py new file mode 100644 index 00000000..6274a1c5 --- /dev/null +++ b/src/leap/soledad/server/_blobs/fs_backend.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# _blobs/fs_backend.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A backend for blobs that stores in filesystem. +""" +import base64 +import json +import os +import time + +from collections import defaultdict +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet import utils +from twisted.web.client import FileBodyProducer + +from leap.common.files import mkdir_p +from leap.soledad.common.blobs import ACCEPTED_FLAGS +from leap.soledad.common.blobs import InvalidFlag +from leap.soledad.common.log import getLogger +from leap.soledad.server import interfaces + +from .errors import BlobExists +from .errors import BlobNotFound +from .errors import QuotaExceeded +from .util import VALID_STRINGS + + +logger = getLogger(__name__) + + +@implementer(interfaces.IBlobsBackend) +class FilesystemBlobsBackend(object): + + USAGE_TIMEOUT = 30 + + def __init__(self, blobs_path='/tmp/blobs/', quota=200 * 1024, + concurrent_writes=50): + self.quota = quota + self.semaphore = defer.DeferredSemaphore(concurrent_writes) + if not os.path.isdir(blobs_path): + os.makedirs(blobs_path) + self.path = blobs_path + self.usage = defaultdict(lambda: (None, None)) + self.usage_locks = defaultdict(defer.DeferredLock) + + def __touch(self, path): + open(path, 'a') + + @defer.inlineCallbacks + def read_blob(self, user, blob_id, consumer, namespace=''): + logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) + path = self._get_path(user, blob_id, namespace) + logger.debug('blob path: %s' % path) + with open(path) as fd: + producer = FileBodyProducer(fd) + yield producer.startProducing(consumer) + + def get_flags(self, user, blob_id, namespace=''): + try: + path = self._get_path(user, blob_id, namespace) + except Exception as e: + return defer.fail(e) + if not os.path.isfile(path): + return defer.fail(BlobNotFound()) + if not os.path.isfile(path + '.flags'): + return defer.succeed([]) + with open(path + '.flags', 'r') as flags_file: + flags = json.loads(flags_file.read()) + return defer.succeed(flags) + + def set_flags(self, user, blob_id, flags, namespace=''): + try: + path = self._get_path(user, blob_id, namespace) + except Exception as e: + return defer.fail(e) + if not os.path.isfile(path): + return defer.fail(BlobNotFound()) + for flag in flags: + if flag not in ACCEPTED_FLAGS: + return defer.fail(InvalidFlag(flag)) + with open(path + '.flags', 'w') as flags_file: + raw_flags = json.dumps(flags) + flags_file.write(raw_flags) + return defer.succeed(None) + + @defer.inlineCallbacks + def write_blob(self, user, blob_id, producer, namespace=''): + # limit the number of concurrent writes to disk + yield self.semaphore.acquire() + try: + path = self._get_path(user, blob_id, namespace) + try: + mkdir_p(os.path.split(path)[0]) + except OSError as e: + logger.warn("Got exception trying to create directory: %r" % e) + if os.path.isfile(path): + raise BlobExists + used = yield self.get_total_storage(user) + length = producer.length / 1024.0 # original length is in bytes + if used + length > self.quota: + raise QuotaExceeded + logger.info('writing blob: %s - %s' % (user, blob_id)) + with open(path, 'wb') as blobfile: + yield producer.startProducing(blobfile) + used += length + yield self._update_usage(user, used) + finally: + self.semaphore.release() + + @defer.inlineCallbacks + def _update_usage(self, user, used): + lock = self.usage_locks[user] + yield lock.acquire() + try: + _, timestamp = self.usage[user] + self.usage[user] = (used, timestamp) + finally: + lock.release() + + def delete_blob(self, user, blob_id, namespace=''): + try: + blob_path = self._get_path(user, blob_id, namespace) + except Exception as e: + return defer.fail(e) + if not os.path.isfile(blob_path): + return defer.fail(BlobNotFound()) + self.__touch(blob_path + '.deleted') + os.unlink(blob_path) + try: + os.unlink(blob_path + '.flags') + except Exception: + pass + return defer.succeed(None) + + def get_blob_size(self, user, blob_id, namespace=''): + try: + blob_path = self._get_path(user, blob_id, namespace) + except Exception as e: + return defer.fail(e) + size = os.stat(blob_path).st_size + return defer.succeed(size) + + def count(self, user, namespace=''): + try: + base_path = self._get_path(user, namespace=namespace) + except Exception as e: + return defer.fail(e) + count = 0 + for _, _, filenames in os.walk(base_path): + count += len(filter(lambda i: not i.endswith('.flags'), filenames)) + return defer.succeed(count) + + def list_blobs(self, user, namespace='', order_by=None, deleted=False, + filter_flag=False): + namespace = namespace or 'default' + blob_ids = [] + try: + base_path = self._get_path(user, namespace=namespace) + except Exception as e: + return defer.fail(e) + + def match(name): + if deleted: + return name.endswith('.deleted') + return VALID_STRINGS.match(name) + for root, dirs, filenames in os.walk(base_path): + blob_ids += [os.path.join(root, name) for name in filenames + if match(name)] + if order_by in ['date', '+date']: + blob_ids.sort(key=lambda x: os.path.getmtime(x)) + elif order_by == '-date': + blob_ids.sort(key=lambda x: os.path.getmtime(x), reverse=True) + elif order_by: + exc = Exception("Unsupported order_by parameter: %s" % order_by) + return defer.fail(exc) + if filter_flag: + blob_ids = list(self._filter_flag(blob_ids, filter_flag)) + blob_ids = [os.path.basename(path).replace('.deleted', '') + for path in blob_ids] + return defer.succeed(blob_ids) + + def _filter_flag(self, blob_paths, flag): + for blob_path in blob_paths: + flag_path = blob_path + '.flags' + if not os.path.isfile(flag_path): + continue + with open(flag_path, 'r') as flags_file: + blob_flags = json.loads(flags_file.read()) + if flag in blob_flags: + yield blob_path + + @defer.inlineCallbacks + def get_total_storage(self, user): + lock = self.usage_locks[user] + yield lock.acquire() + try: + used, timestamp = self.usage[user] + if used is None or time.time() > timestamp + self.USAGE_TIMEOUT: + path = self._get_path(user) + used = yield self._get_disk_usage(path) + self.usage[user] = (used, time.time()) + defer.returnValue(used) + finally: + lock.release() + + def get_tag(self, user, blob_id, namespace=''): + try: + blob_path = self._get_path(user, blob_id, namespace) + except Exception as e: + return defer.fail(e) + if not os.path.isfile(blob_path): + return defer.fail(BlobNotFound()) + with open(blob_path) as doc_file: + doc_file.seek(-16, 2) + tag = base64.urlsafe_b64encode(doc_file.read()) + return defer.succeed(tag) + + @defer.inlineCallbacks + def _get_disk_usage(self, start_path): + if not os.path.isdir(start_path): + defer.returnValue(0) + cmd = ['/usr/bin/du', '-s', '-c', start_path] + output = yield utils.getProcessOutput(cmd[0], cmd[1:]) + size = output.split()[0] + defer.returnValue(int(size)) + + def _validate_path(self, desired_path, user, blob_id): + if not VALID_STRINGS.match(user): + raise Exception("Invalid characters on user: %s" % user) + if blob_id and not VALID_STRINGS.match(blob_id): + raise Exception("Invalid characters on blob_id: %s" % blob_id) + desired_path = os.path.realpath(desired_path) # expand path references + root = os.path.realpath(self.path) + if not desired_path.startswith(root + os.sep + user): + err = "User %s tried accessing a invalid path: %s" % (user, + desired_path) + raise Exception(err) + return desired_path + + def exists(self, user, blob_id, namespace): + try: + path = self._get_path(user, blob_id=blob_id, namespace=namespace) + except Exception as e: + return defer.fail(e) + return os.path.isfile(path) + + def _get_path(self, user, blob_id='', namespace=''): + parts = [user] + if blob_id: + namespace = namespace or 'default' + parts += self._get_path_parts(blob_id, namespace) + elif namespace and not blob_id: + parts += [namespace] # namespace path + else: + pass # root path + path = os.path.join(self.path, *parts) + return self._validate_path(path, user, blob_id) + + def _get_path_parts(self, blob_id, custom): + if custom and not blob_id: + return [custom] + return [custom] + [blob_id[0], blob_id[0:3], blob_id[0:6]] + [blob_id] |