diff options
| -rw-r--r-- | src/leap/soledad/server/_blobs/fs_backend.py | 183 | 
1 files changed, 107 insertions, 76 deletions
| diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py index 769ba47b..495e19c1 100644 --- a/src/leap/soledad/server/_blobs/fs_backend.py +++ b/src/leap/soledad/server/_blobs/fs_backend.py @@ -82,6 +82,37 @@ class SingleRangeProducer(SingleRangeStaticProducer):              self.deferred.callback(None) +def isolated(path): +    """ +    A decorator that isolates execution of the decorated method using a file +    system lock based on the given path. A symlink in ``{path}.lock`` will be +    used to make sure only one isolated method is executed at a time for that +    path. +    """ + +    def decorator(method): + +        def new_method(*args, **kwargs): +            dirname, _ = os.path.split(path) +            mkdir_p(dirname) +            name = path + '.lock' +            # TODO: evaluate the need to replace this for a readers-writer lock +            lock = defer.DeferredFilesystemLock(name) + +            def _release(result): +                lock.unlock() +                return result + +            d = lock.deferUntilLocked() +            d.addCallback(lambda _: method(*args, **kwargs)) +            d.addCallbacks(_release, _release) +            return d + +        return new_method + +    return decorator + +  @implementer(interfaces.IBlobsBackend)  class FilesystemBlobsBackend(object): @@ -100,22 +131,14 @@ class FilesystemBlobsBackend(object):      def __touch(self, path):          open(path, 'a') -    def _fslock(self, path): -        dirname, _ = os.path.split(path) -        mkdir_p(dirname) -        name = path + '.lock' -        # TODO: evaluate the need to replace this for a readers-writer lock. -        return defer.DeferredFilesystemLock(name) - -    @defer.inlineCallbacks      def read_blob(self, user, blob_id, consumer, namespace='', range=None):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() +            return defer.fail(BlobNotFound((user, blob_id))) +        @isolated(path) +        @defer.inlineCallbacks +        def _read_blob():              logger.info('reading blob: %s - %s@%s'                          % (user, blob_id, namespace))              logger.debug('blob path: %s' % path) @@ -129,52 +152,55 @@ class FilesystemBlobsBackend(object):                      args = (consumer, fd, offset, size)                      producer = SingleRangeProducer(*args)                  yield producer.start() -        finally: -            fslock.unlock() -    @defer.inlineCallbacks +        return _read_blob() +      def get_flags(self, user, blob_id, namespace=''):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) +            return defer.fail(BlobNotFound((user, blob_id)))          if not os.path.isfile(path + '.flags'): -            defer.returnValue([]) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() +            return defer.succeed([]) -            with open(path + '.flags', 'r') as flags_file: -                flags = json.loads(flags_file.read()) -                defer.returnValue(flags) -        finally: -            fslock.unlock() +        @isolated(path) +        def _get_flags(): +            try: +                with open(path + '.flags', 'r') as flags_file: +                    flags = json.loads(flags_file.read()) +                    return defer.succeed(flags) +            except Exception as e: +                return defer.fail(e) + +        return _get_flags() -    @defer.inlineCallbacks      def set_flags(self, user, blob_id, flags, namespace=''):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() -            for flag in flags: -                if flag not in ACCEPTED_FLAGS: -                    raise InvalidFlag(flag) -            with open(path + '.flags', 'w') as flags_file: -                raw_flags = json.dumps(flags) -                flags_file.write(raw_flags) -        finally: -            fslock.unlock() +            return defer.fail(BlobNotFound((user, blob_id))) + +        @isolated(path) +        def _set_flags(): +            try: +                for flag in flags: +                    if flag not in ACCEPTED_FLAGS: +                        raise InvalidFlag(flag) +                with open(path + '.flags', 'w') as flags_file: +                    raw_flags = json.dumps(flags) +                    flags_file.write(raw_flags) +                    return defer.succeed(None) +            except Exception as e: +                return defer.fail(e) + +        return _set_flags() -    @defer.inlineCallbacks      def write_blob(self, user, blob_id, producer, namespace=''):          path = self._get_path(user, blob_id, namespace)          if os.path.isfile(path): -            raise BlobExists -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() +            return defer.fail(BlobExists((user, blob_id))) +        @isolated(path) +        @defer.inlineCallbacks +        def _write_blob():              try:                  # limit the number of concurrent writes to disk                  yield self.semaphore.acquire() @@ -195,8 +221,8 @@ class FilesystemBlobsBackend(object):                  yield self._update_usage(user, used)              finally:                  self.semaphore.release() -        finally: -            fslock.unlock() + +        return _write_blob()      @defer.inlineCallbacks      def _update_usage(self, user, used): @@ -208,35 +234,37 @@ class FilesystemBlobsBackend(object):          finally:              lock.release() -    @defer.inlineCallbacks      def delete_blob(self, user, blob_id, namespace=''):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() +            return defer.fail(BlobNotFound((user, blob_id))) + +        @isolated(path) +        def _delete_blob():              self.__touch(path + '.deleted')              os.unlink(path)              try:                  os.unlink(path + '.flags')              except Exception:                  pass -        finally: -            fslock.unlock() +            return defer.succeed(None) + +        return _delete_blob() -    @defer.inlineCallbacks      def get_blob_size(self, user, blob_id, namespace=''):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() +            return defer.fail(BlobNotFound((user, blob_id))) + +        @isolated(path) +        def _get_blob_size():              size = os.stat(path).st_size -            defer.returnValue(size) -        finally: -            fslock.unlock() +            try: +                return defer.succeed(size) +            except Exception as e: +                return defer.fail(e) + +        return _get_blob_size()      def count(self, user, namespace=''):          try: @@ -301,20 +329,22 @@ class FilesystemBlobsBackend(object):          finally:              lock.release() -    @defer.inlineCallbacks      def get_tag(self, user, blob_id, namespace=''):          path = self._get_path(user, blob_id, namespace)          if not os.path.isfile(path): -            raise BlobNotFound((user, blob_id)) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() -            with open(path) as doc_file: -                doc_file.seek(-16, 2) -                tag = base64.urlsafe_b64encode(doc_file.read()) -                defer.returnValue(tag) -        finally: -            fslock.unlock() +            return defer.fail(BlobNotFound((user, blob_id))) + +        @isolated(path) +        def _get_tag(): +            try: +                with open(path) as doc_file: +                    doc_file.seek(-16, 2) +                    tag = base64.urlsafe_b64encode(doc_file.read()) +                    return defer.succeed(tag) +            except Exception as e: +                return defer.fail(e) + +        return _get_tag()      @defer.inlineCallbacks      def _get_disk_usage(self, start_path): @@ -341,12 +371,13 @@ class FilesystemBlobsBackend(object):      @defer.inlineCallbacks      def exists(self, user, blob_id, namespace):          path = self._get_path(user, blob_id, namespace) -        fslock = self._fslock(path) -        try: -            yield fslock.deferUntilLocked() + +        @isolated(path) +        @defer.inlineCallbacks +        def _exists():              defer.returnValue(os.path.isfile(path)) -        finally: -            fslock.unlock() + +        return _exists()      def _get_path(self, user, blob_id='', namespace=''):          parts = [user] | 
