diff options
Diffstat (limited to 'src/leap/soledad')
-rw-r--r-- | src/leap/soledad/server/_blobs/fs_backend.py | 183 |
1 files changed, 107 insertions, 76 deletions
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py index 769ba47b..495e19c1 100644 --- a/src/leap/soledad/server/_blobs/fs_backend.py +++ b/src/leap/soledad/server/_blobs/fs_backend.py @@ -82,6 +82,37 @@ class SingleRangeProducer(SingleRangeStaticProducer): self.deferred.callback(None) +def isolated(path): + """ + A decorator that isolates execution of the decorated method using a file + system lock based on the given path. A symlink in ``{path}.lock`` will be + used to make sure only one isolated method is executed at a time for that + path. + """ + + def decorator(method): + + def new_method(*args, **kwargs): + dirname, _ = os.path.split(path) + mkdir_p(dirname) + name = path + '.lock' + # TODO: evaluate the need to replace this for a readers-writer lock + lock = defer.DeferredFilesystemLock(name) + + def _release(result): + lock.unlock() + return result + + d = lock.deferUntilLocked() + d.addCallback(lambda _: method(*args, **kwargs)) + d.addCallbacks(_release, _release) + return d + + return new_method + + return decorator + + @implementer(interfaces.IBlobsBackend) class FilesystemBlobsBackend(object): @@ -100,22 +131,14 @@ class FilesystemBlobsBackend(object): def __touch(self, path): open(path, 'a') - def _fslock(self, path): - dirname, _ = os.path.split(path) - mkdir_p(dirname) - name = path + '.lock' - # TODO: evaluate the need to replace this for a readers-writer lock. - return defer.DeferredFilesystemLock(name) - - @defer.inlineCallbacks def read_blob(self, user, blob_id, consumer, namespace='', range=None): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + return defer.fail(BlobNotFound((user, blob_id))) + @isolated(path) + @defer.inlineCallbacks + def _read_blob(): logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) logger.debug('blob path: %s' % path) @@ -129,52 +152,55 @@ class FilesystemBlobsBackend(object): args = (consumer, fd, offset, size) producer = SingleRangeProducer(*args) yield producer.start() - finally: - fslock.unlock() - @defer.inlineCallbacks + return _read_blob() + def get_flags(self, user, blob_id, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) + return defer.fail(BlobNotFound((user, blob_id))) if not os.path.isfile(path + '.flags'): - defer.returnValue([]) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + return defer.succeed([]) - with open(path + '.flags', 'r') as flags_file: - flags = json.loads(flags_file.read()) - defer.returnValue(flags) - finally: - fslock.unlock() + @isolated(path) + def _get_flags(): + try: + with open(path + '.flags', 'r') as flags_file: + flags = json.loads(flags_file.read()) + return defer.succeed(flags) + except Exception as e: + return defer.fail(e) + + return _get_flags() - @defer.inlineCallbacks def set_flags(self, user, blob_id, flags, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() - for flag in flags: - if flag not in ACCEPTED_FLAGS: - raise InvalidFlag(flag) - with open(path + '.flags', 'w') as flags_file: - raw_flags = json.dumps(flags) - flags_file.write(raw_flags) - finally: - fslock.unlock() + return defer.fail(BlobNotFound((user, blob_id))) + + @isolated(path) + def _set_flags(): + try: + for flag in flags: + if flag not in ACCEPTED_FLAGS: + raise InvalidFlag(flag) + with open(path + '.flags', 'w') as flags_file: + raw_flags = json.dumps(flags) + flags_file.write(raw_flags) + return defer.succeed(None) + except Exception as e: + return defer.fail(e) + + return _set_flags() - @defer.inlineCallbacks def write_blob(self, user, blob_id, producer, namespace=''): path = self._get_path(user, blob_id, namespace) if os.path.isfile(path): - raise BlobExists - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + return defer.fail(BlobExists((user, blob_id))) + @isolated(path) + @defer.inlineCallbacks + def _write_blob(): try: # limit the number of concurrent writes to disk yield self.semaphore.acquire() @@ -195,8 +221,8 @@ class FilesystemBlobsBackend(object): yield self._update_usage(user, used) finally: self.semaphore.release() - finally: - fslock.unlock() + + return _write_blob() @defer.inlineCallbacks def _update_usage(self, user, used): @@ -208,35 +234,37 @@ class FilesystemBlobsBackend(object): finally: lock.release() - @defer.inlineCallbacks def delete_blob(self, user, blob_id, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + return defer.fail(BlobNotFound((user, blob_id))) + + @isolated(path) + def _delete_blob(): self.__touch(path + '.deleted') os.unlink(path) try: os.unlink(path + '.flags') except Exception: pass - finally: - fslock.unlock() + return defer.succeed(None) + + return _delete_blob() - @defer.inlineCallbacks def get_blob_size(self, user, blob_id, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + return defer.fail(BlobNotFound((user, blob_id))) + + @isolated(path) + def _get_blob_size(): size = os.stat(path).st_size - defer.returnValue(size) - finally: - fslock.unlock() + try: + return defer.succeed(size) + except Exception as e: + return defer.fail(e) + + return _get_blob_size() def count(self, user, namespace=''): try: @@ -301,20 +329,22 @@ class FilesystemBlobsBackend(object): finally: lock.release() - @defer.inlineCallbacks def get_tag(self, user, blob_id, namespace=''): path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - raise BlobNotFound((user, blob_id)) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() - with open(path) as doc_file: - doc_file.seek(-16, 2) - tag = base64.urlsafe_b64encode(doc_file.read()) - defer.returnValue(tag) - finally: - fslock.unlock() + return defer.fail(BlobNotFound((user, blob_id))) + + @isolated(path) + def _get_tag(): + try: + with open(path) as doc_file: + doc_file.seek(-16, 2) + tag = base64.urlsafe_b64encode(doc_file.read()) + return defer.succeed(tag) + except Exception as e: + return defer.fail(e) + + return _get_tag() @defer.inlineCallbacks def _get_disk_usage(self, start_path): @@ -341,12 +371,13 @@ class FilesystemBlobsBackend(object): @defer.inlineCallbacks def exists(self, user, blob_id, namespace): path = self._get_path(user, blob_id, namespace) - fslock = self._fslock(path) - try: - yield fslock.deferUntilLocked() + + @isolated(path) + @defer.inlineCallbacks + def _exists(): defer.returnValue(os.path.isfile(path)) - finally: - fslock.unlock() + + return _exists() def _get_path(self, user, blob_id='', namespace=''): parts = [user] |