summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/server/_blobs/fs_backend.py183
1 files changed, 107 insertions, 76 deletions
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py
index 769ba47b..495e19c1 100644
--- a/src/leap/soledad/server/_blobs/fs_backend.py
+++ b/src/leap/soledad/server/_blobs/fs_backend.py
@@ -82,6 +82,37 @@ class SingleRangeProducer(SingleRangeStaticProducer):
self.deferred.callback(None)
+def isolated(path):
+ """
+ A decorator that isolates execution of the decorated method using a file
+ system lock based on the given path. A symlink in ``{path}.lock`` will be
+ used to make sure only one isolated method is executed at a time for that
+ path.
+ """
+
+ def decorator(method):
+
+ def new_method(*args, **kwargs):
+ dirname, _ = os.path.split(path)
+ mkdir_p(dirname)
+ name = path + '.lock'
+ # TODO: evaluate the need to replace this for a readers-writer lock
+ lock = defer.DeferredFilesystemLock(name)
+
+ def _release(result):
+ lock.unlock()
+ return result
+
+ d = lock.deferUntilLocked()
+ d.addCallback(lambda _: method(*args, **kwargs))
+ d.addCallbacks(_release, _release)
+ return d
+
+ return new_method
+
+ return decorator
+
+
@implementer(interfaces.IBlobsBackend)
class FilesystemBlobsBackend(object):
@@ -100,22 +131,14 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
- def _fslock(self, path):
- dirname, _ = os.path.split(path)
- mkdir_p(dirname)
- name = path + '.lock'
- # TODO: evaluate the need to replace this for a readers-writer lock.
- return defer.DeferredFilesystemLock(name)
-
- @defer.inlineCallbacks
def read_blob(self, user, blob_id, consumer, namespace='', range=None):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+ return defer.fail(BlobNotFound((user, blob_id)))
+ @isolated(path)
+ @defer.inlineCallbacks
+ def _read_blob():
logger.info('reading blob: %s - %s@%s'
% (user, blob_id, namespace))
logger.debug('blob path: %s' % path)
@@ -129,52 +152,55 @@ class FilesystemBlobsBackend(object):
args = (consumer, fd, offset, size)
producer = SingleRangeProducer(*args)
yield producer.start()
- finally:
- fslock.unlock()
- @defer.inlineCallbacks
+ return _read_blob()
+
def get_flags(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
+ return defer.fail(BlobNotFound((user, blob_id)))
if not os.path.isfile(path + '.flags'):
- defer.returnValue([])
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+ return defer.succeed([])
- with open(path + '.flags', 'r') as flags_file:
- flags = json.loads(flags_file.read())
- defer.returnValue(flags)
- finally:
- fslock.unlock()
+ @isolated(path)
+ def _get_flags():
+ try:
+ with open(path + '.flags', 'r') as flags_file:
+ flags = json.loads(flags_file.read())
+ return defer.succeed(flags)
+ except Exception as e:
+ return defer.fail(e)
+
+ return _get_flags()
- @defer.inlineCallbacks
def set_flags(self, user, blob_id, flags, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
- for flag in flags:
- if flag not in ACCEPTED_FLAGS:
- raise InvalidFlag(flag)
- with open(path + '.flags', 'w') as flags_file:
- raw_flags = json.dumps(flags)
- flags_file.write(raw_flags)
- finally:
- fslock.unlock()
+ return defer.fail(BlobNotFound((user, blob_id)))
+
+ @isolated(path)
+ def _set_flags():
+ try:
+ for flag in flags:
+ if flag not in ACCEPTED_FLAGS:
+ raise InvalidFlag(flag)
+ with open(path + '.flags', 'w') as flags_file:
+ raw_flags = json.dumps(flags)
+ flags_file.write(raw_flags)
+ return defer.succeed(None)
+ except Exception as e:
+ return defer.fail(e)
+
+ return _set_flags()
- @defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
path = self._get_path(user, blob_id, namespace)
if os.path.isfile(path):
- raise BlobExists
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+ return defer.fail(BlobExists((user, blob_id)))
+ @isolated(path)
+ @defer.inlineCallbacks
+ def _write_blob():
try:
# limit the number of concurrent writes to disk
yield self.semaphore.acquire()
@@ -195,8 +221,8 @@ class FilesystemBlobsBackend(object):
yield self._update_usage(user, used)
finally:
self.semaphore.release()
- finally:
- fslock.unlock()
+
+ return _write_blob()
@defer.inlineCallbacks
def _update_usage(self, user, used):
@@ -208,35 +234,37 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
- @defer.inlineCallbacks
def delete_blob(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+ return defer.fail(BlobNotFound((user, blob_id)))
+
+ @isolated(path)
+ def _delete_blob():
self.__touch(path + '.deleted')
os.unlink(path)
try:
os.unlink(path + '.flags')
except Exception:
pass
- finally:
- fslock.unlock()
+ return defer.succeed(None)
+
+ return _delete_blob()
- @defer.inlineCallbacks
def get_blob_size(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+ return defer.fail(BlobNotFound((user, blob_id)))
+
+ @isolated(path)
+ def _get_blob_size():
size = os.stat(path).st_size
- defer.returnValue(size)
- finally:
- fslock.unlock()
+ try:
+ return defer.succeed(size)
+ except Exception as e:
+ return defer.fail(e)
+
+ return _get_blob_size()
def count(self, user, namespace=''):
try:
@@ -301,20 +329,22 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
- @defer.inlineCallbacks
def get_tag(self, user, blob_id, namespace=''):
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- raise BlobNotFound((user, blob_id))
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
- with open(path) as doc_file:
- doc_file.seek(-16, 2)
- tag = base64.urlsafe_b64encode(doc_file.read())
- defer.returnValue(tag)
- finally:
- fslock.unlock()
+ return defer.fail(BlobNotFound((user, blob_id)))
+
+ @isolated(path)
+ def _get_tag():
+ try:
+ with open(path) as doc_file:
+ doc_file.seek(-16, 2)
+ tag = base64.urlsafe_b64encode(doc_file.read())
+ return defer.succeed(tag)
+ except Exception as e:
+ return defer.fail(e)
+
+ return _get_tag()
@defer.inlineCallbacks
def _get_disk_usage(self, start_path):
@@ -341,12 +371,13 @@ class FilesystemBlobsBackend(object):
@defer.inlineCallbacks
def exists(self, user, blob_id, namespace):
path = self._get_path(user, blob_id, namespace)
- fslock = self._fslock(path)
- try:
- yield fslock.deferUntilLocked()
+
+ @isolated(path)
+ @defer.inlineCallbacks
+ def _exists():
defer.returnValue(os.path.isfile(path))
- finally:
- fslock.unlock()
+
+ return _exists()
def _get_path(self, user, blob_id='', namespace=''):
parts = [user]