diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/soledad/server/_blobs/fs_backend.py | 205 | ||||
-rw-r--r-- | src/leap/soledad/server/interfaces.py | 15 |
2 files changed, 134 insertions, 86 deletions
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py index 79bbc4d7..769ba47b 100644 --- a/src/leap/soledad/server/_blobs/fs_backend.py +++ b/src/leap/soledad/server/_blobs/fs_backend.py @@ -100,75 +100,103 @@ class FilesystemBlobsBackend(object): def __touch(self, path): open(path, 'a') + def _fslock(self, path): + dirname, _ = os.path.split(path) + mkdir_p(dirname) + name = path + '.lock' + # TODO: evaluate the need to replace this for a readers-writer lock. + return defer.DeferredFilesystemLock(name) + @defer.inlineCallbacks def read_blob(self, user, blob_id, consumer, namespace='', range=None): - logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace)) path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): raise BlobNotFound((user, blob_id)) - logger.debug('blob path: %s' % path) - with open(path) as fd: - if range is None: - producer = NoRangeProducer(consumer, fd) - else: - start, end = range - offset = start - size = end - start - args = (consumer, fd, offset, size) - producer = SingleRangeProducer(*args) - yield producer.start() + fslock = self._fslock(path) + try: + yield fslock.deferUntilLocked() + + logger.info('reading blob: %s - %s@%s' + % (user, blob_id, namespace)) + logger.debug('blob path: %s' % path) + with open(path) as fd: + if range is None: + producer = NoRangeProducer(consumer, fd) + else: + start, end = range + offset = start + size = end - start + args = (consumer, fd, offset, size) + producer = SingleRangeProducer(*args) + yield producer.start() + finally: + fslock.unlock() + @defer.inlineCallbacks def get_flags(self, user, blob_id, namespace=''): - try: - path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) + path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - return defer.fail(BlobNotFound((user, blob_id))) + raise BlobNotFound((user, blob_id)) if not os.path.isfile(path + '.flags'): - return defer.succeed([]) - with open(path + '.flags', 'r') as flags_file: - flags = json.loads(flags_file.read()) - return defer.succeed(flags) + defer.returnValue([]) + fslock = self._fslock(path) + try: + yield fslock.deferUntilLocked() + with open(path + '.flags', 'r') as flags_file: + flags = json.loads(flags_file.read()) + defer.returnValue(flags) + finally: + fslock.unlock() + + @defer.inlineCallbacks def set_flags(self, user, blob_id, flags, namespace=''): - try: - path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) + path = self._get_path(user, blob_id, namespace) if not os.path.isfile(path): - return defer.fail(BlobNotFound((user, blob_id))) - for flag in flags: - if flag not in ACCEPTED_FLAGS: - return defer.fail(InvalidFlag(flag)) - with open(path + '.flags', 'w') as flags_file: - raw_flags = json.dumps(flags) - flags_file.write(raw_flags) - return defer.succeed(None) + raise BlobNotFound((user, blob_id)) + fslock = self._fslock(path) + try: + yield fslock.deferUntilLocked() + for flag in flags: + if flag not in ACCEPTED_FLAGS: + raise InvalidFlag(flag) + with open(path + '.flags', 'w') as flags_file: + raw_flags = json.dumps(flags) + flags_file.write(raw_flags) + finally: + fslock.unlock() @defer.inlineCallbacks def write_blob(self, user, blob_id, producer, namespace=''): - # limit the number of concurrent writes to disk - yield self.semaphore.acquire() + path = self._get_path(user, blob_id, namespace) + if os.path.isfile(path): + raise BlobExists + fslock = self._fslock(path) try: - path = self._get_path(user, blob_id, namespace) + yield fslock.deferUntilLocked() + try: - mkdir_p(os.path.split(path)[0]) - except OSError as e: - logger.warn("Got exception trying to create directory: %r" % e) - if os.path.isfile(path): - raise BlobExists - used = yield self.get_total_storage(user) - length = producer.length / 1024.0 # original length is in bytes - if used + length > self.quota: - raise QuotaExceeded - logger.info('writing blob: %s - %s' % (user, blob_id)) - with open(path, 'wb') as blobfile: - yield producer.startProducing(blobfile) - used += length - yield self._update_usage(user, used) + # limit the number of concurrent writes to disk + yield self.semaphore.acquire() + + try: + mkdir_p(os.path.split(path)[0]) + except OSError as e: + logger.warn( + "Got exception trying to create directory: %r" % e) + used = yield self.get_total_storage(user) + length = producer.length / 1024.0 + if used + length > self.quota: + raise QuotaExceeded + logger.info('writing blob: %s - %s' % (user, blob_id)) + with open(path, 'wb') as blobfile: + yield producer.startProducing(blobfile) + used += length + yield self._update_usage(user, used) + finally: + self.semaphore.release() finally: - self.semaphore.release() + fslock.unlock() @defer.inlineCallbacks def _update_usage(self, user, used): @@ -180,30 +208,35 @@ class FilesystemBlobsBackend(object): finally: lock.release() + @defer.inlineCallbacks def delete_blob(self, user, blob_id, namespace=''): + path = self._get_path(user, blob_id, namespace) + if not os.path.isfile(path): + raise BlobNotFound((user, blob_id)) + fslock = self._fslock(path) try: - blob_path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) - if not os.path.isfile(blob_path): - return defer.fail(BlobNotFound((user, blob_id))) - self.__touch(blob_path + '.deleted') - os.unlink(blob_path) - try: - os.unlink(blob_path + '.flags') - except Exception: - pass - return defer.succeed(None) + yield fslock.deferUntilLocked() + self.__touch(path + '.deleted') + os.unlink(path) + try: + os.unlink(path + '.flags') + except Exception: + pass + finally: + fslock.unlock() + @defer.inlineCallbacks def get_blob_size(self, user, blob_id, namespace=''): + path = self._get_path(user, blob_id, namespace) + if not os.path.isfile(path): + raise BlobNotFound((user, blob_id)) + fslock = self._fslock(path) try: - blob_path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) - if not os.path.isfile(blob_path): - return defer.fail(BlobNotFound((user, blob_id))) - size = os.stat(blob_path).st_size - return defer.succeed(size) + yield fslock.deferUntilLocked() + size = os.stat(path).st_size + defer.returnValue(size) + finally: + fslock.unlock() def count(self, user, namespace=''): try: @@ -268,17 +301,20 @@ class FilesystemBlobsBackend(object): finally: lock.release() + @defer.inlineCallbacks def get_tag(self, user, blob_id, namespace=''): + path = self._get_path(user, blob_id, namespace) + if not os.path.isfile(path): + raise BlobNotFound((user, blob_id)) + fslock = self._fslock(path) try: - blob_path = self._get_path(user, blob_id, namespace) - except Exception as e: - return defer.fail(e) - if not os.path.isfile(blob_path): - return defer.fail(BlobNotFound((user, blob_id))) - with open(blob_path) as doc_file: - doc_file.seek(-16, 2) - tag = base64.urlsafe_b64encode(doc_file.read()) - return defer.succeed(tag) + yield fslock.deferUntilLocked() + with open(path) as doc_file: + doc_file.seek(-16, 2) + tag = base64.urlsafe_b64encode(doc_file.read()) + defer.returnValue(tag) + finally: + fslock.unlock() @defer.inlineCallbacks def _get_disk_usage(self, start_path): @@ -302,12 +338,15 @@ class FilesystemBlobsBackend(object): raise Exception(err) return desired_path + @defer.inlineCallbacks def exists(self, user, blob_id, namespace): + path = self._get_path(user, blob_id, namespace) + fslock = self._fslock(path) try: - path = self._get_path(user, blob_id=blob_id, namespace=namespace) - except Exception as e: - return defer.fail(e) - return os.path.isfile(path) + yield fslock.deferUntilLocked() + defer.returnValue(os.path.isfile(path)) + finally: + fslock.unlock() def _get_path(self, user, blob_id='', namespace=''): parts = [user] diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py index 96fa2f94..deab2027 100644 --- a/src/leap/soledad/server/interfaces.py +++ b/src/leap/soledad/server/interfaces.py @@ -23,6 +23,10 @@ class IBlobsBackend(Interface): """ An interface for a backend that can store blobs. + + There might be concurrent calls to methods that modify the same blob, so + it's the backend implementation's responsibility to ensure isolation of + such actions. """ def read_blob(user, blob_id, consumer, namespace='', range=None): @@ -66,8 +70,11 @@ class IBlobsBackend(Interface): backend storage. :rtype: twisted.internet.defer.Deferred - :raise BlobExists: Raised when a blob with that id already exists. - :raise QuotaExceeded: Raised when the quota for that user was exceeded. + :raise BlobExists: Raised (asynchronously) when a blob with that id + already exists. + + :raise QuotaExceeded: Raised (asynchronously) when the quota for that + user was exceeded. """ def delete_blob(user, blob_id, namespace=''): @@ -218,5 +225,7 @@ class IBlobsBackend(Interface): :raise BlobNotFound: Raised (asynchronously) when the blob was not found in the backend. - :raise InvalidFlag: Raised when one of the flags passed is invalid. + + :raise InvalidFlag: Raised (asynchronously) when one of the flags + passed is invalid. """ |