summaryrefslogtreecommitdiff
path: root/src/leap/soledad/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/server')
-rw-r--r--src/leap/soledad/server/_blobs/fs_backend.py205
-rw-r--r--src/leap/soledad/server/interfaces.py15
2 files changed, 134 insertions, 86 deletions
diff --git a/src/leap/soledad/server/_blobs/fs_backend.py b/src/leap/soledad/server/_blobs/fs_backend.py
index 79bbc4d7..769ba47b 100644
--- a/src/leap/soledad/server/_blobs/fs_backend.py
+++ b/src/leap/soledad/server/_blobs/fs_backend.py
@@ -100,75 +100,103 @@ class FilesystemBlobsBackend(object):
def __touch(self, path):
open(path, 'a')
+ def _fslock(self, path):
+ dirname, _ = os.path.split(path)
+ mkdir_p(dirname)
+ name = path + '.lock'
+ # TODO: evaluate the need to replace this for a readers-writer lock.
+ return defer.DeferredFilesystemLock(name)
+
@defer.inlineCallbacks
def read_blob(self, user, blob_id, consumer, namespace='', range=None):
- logger.info('reading blob: %s - %s@%s' % (user, blob_id, namespace))
path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
raise BlobNotFound((user, blob_id))
- logger.debug('blob path: %s' % path)
- with open(path) as fd:
- if range is None:
- producer = NoRangeProducer(consumer, fd)
- else:
- start, end = range
- offset = start
- size = end - start
- args = (consumer, fd, offset, size)
- producer = SingleRangeProducer(*args)
- yield producer.start()
+ fslock = self._fslock(path)
+ try:
+ yield fslock.deferUntilLocked()
+
+ logger.info('reading blob: %s - %s@%s'
+ % (user, blob_id, namespace))
+ logger.debug('blob path: %s' % path)
+ with open(path) as fd:
+ if range is None:
+ producer = NoRangeProducer(consumer, fd)
+ else:
+ start, end = range
+ offset = start
+ size = end - start
+ args = (consumer, fd, offset, size)
+ producer = SingleRangeProducer(*args)
+ yield producer.start()
+ finally:
+ fslock.unlock()
+ @defer.inlineCallbacks
def get_flags(self, user, blob_id, namespace=''):
- try:
- path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
+ path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- return defer.fail(BlobNotFound((user, blob_id)))
+ raise BlobNotFound((user, blob_id))
if not os.path.isfile(path + '.flags'):
- return defer.succeed([])
- with open(path + '.flags', 'r') as flags_file:
- flags = json.loads(flags_file.read())
- return defer.succeed(flags)
+ defer.returnValue([])
+ fslock = self._fslock(path)
+ try:
+ yield fslock.deferUntilLocked()
+ with open(path + '.flags', 'r') as flags_file:
+ flags = json.loads(flags_file.read())
+ defer.returnValue(flags)
+ finally:
+ fslock.unlock()
+
+ @defer.inlineCallbacks
def set_flags(self, user, blob_id, flags, namespace=''):
- try:
- path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
+ path = self._get_path(user, blob_id, namespace)
if not os.path.isfile(path):
- return defer.fail(BlobNotFound((user, blob_id)))
- for flag in flags:
- if flag not in ACCEPTED_FLAGS:
- return defer.fail(InvalidFlag(flag))
- with open(path + '.flags', 'w') as flags_file:
- raw_flags = json.dumps(flags)
- flags_file.write(raw_flags)
- return defer.succeed(None)
+ raise BlobNotFound((user, blob_id))
+ fslock = self._fslock(path)
+ try:
+ yield fslock.deferUntilLocked()
+ for flag in flags:
+ if flag not in ACCEPTED_FLAGS:
+ raise InvalidFlag(flag)
+ with open(path + '.flags', 'w') as flags_file:
+ raw_flags = json.dumps(flags)
+ flags_file.write(raw_flags)
+ finally:
+ fslock.unlock()
@defer.inlineCallbacks
def write_blob(self, user, blob_id, producer, namespace=''):
- # limit the number of concurrent writes to disk
- yield self.semaphore.acquire()
+ path = self._get_path(user, blob_id, namespace)
+ if os.path.isfile(path):
+ raise BlobExists
+ fslock = self._fslock(path)
try:
- path = self._get_path(user, blob_id, namespace)
+ yield fslock.deferUntilLocked()
+
try:
- mkdir_p(os.path.split(path)[0])
- except OSError as e:
- logger.warn("Got exception trying to create directory: %r" % e)
- if os.path.isfile(path):
- raise BlobExists
- used = yield self.get_total_storage(user)
- length = producer.length / 1024.0 # original length is in bytes
- if used + length > self.quota:
- raise QuotaExceeded
- logger.info('writing blob: %s - %s' % (user, blob_id))
- with open(path, 'wb') as blobfile:
- yield producer.startProducing(blobfile)
- used += length
- yield self._update_usage(user, used)
+ # limit the number of concurrent writes to disk
+ yield self.semaphore.acquire()
+
+ try:
+ mkdir_p(os.path.split(path)[0])
+ except OSError as e:
+ logger.warn(
+ "Got exception trying to create directory: %r" % e)
+ used = yield self.get_total_storage(user)
+ length = producer.length / 1024.0
+ if used + length > self.quota:
+ raise QuotaExceeded
+ logger.info('writing blob: %s - %s' % (user, blob_id))
+ with open(path, 'wb') as blobfile:
+ yield producer.startProducing(blobfile)
+ used += length
+ yield self._update_usage(user, used)
+ finally:
+ self.semaphore.release()
finally:
- self.semaphore.release()
+ fslock.unlock()
@defer.inlineCallbacks
def _update_usage(self, user, used):
@@ -180,30 +208,35 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
+ @defer.inlineCallbacks
def delete_blob(self, user, blob_id, namespace=''):
+ path = self._get_path(user, blob_id, namespace)
+ if not os.path.isfile(path):
+ raise BlobNotFound((user, blob_id))
+ fslock = self._fslock(path)
try:
- blob_path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
- if not os.path.isfile(blob_path):
- return defer.fail(BlobNotFound((user, blob_id)))
- self.__touch(blob_path + '.deleted')
- os.unlink(blob_path)
- try:
- os.unlink(blob_path + '.flags')
- except Exception:
- pass
- return defer.succeed(None)
+ yield fslock.deferUntilLocked()
+ self.__touch(path + '.deleted')
+ os.unlink(path)
+ try:
+ os.unlink(path + '.flags')
+ except Exception:
+ pass
+ finally:
+ fslock.unlock()
+ @defer.inlineCallbacks
def get_blob_size(self, user, blob_id, namespace=''):
+ path = self._get_path(user, blob_id, namespace)
+ if not os.path.isfile(path):
+ raise BlobNotFound((user, blob_id))
+ fslock = self._fslock(path)
try:
- blob_path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
- if not os.path.isfile(blob_path):
- return defer.fail(BlobNotFound((user, blob_id)))
- size = os.stat(blob_path).st_size
- return defer.succeed(size)
+ yield fslock.deferUntilLocked()
+ size = os.stat(path).st_size
+ defer.returnValue(size)
+ finally:
+ fslock.unlock()
def count(self, user, namespace=''):
try:
@@ -268,17 +301,20 @@ class FilesystemBlobsBackend(object):
finally:
lock.release()
+ @defer.inlineCallbacks
def get_tag(self, user, blob_id, namespace=''):
+ path = self._get_path(user, blob_id, namespace)
+ if not os.path.isfile(path):
+ raise BlobNotFound((user, blob_id))
+ fslock = self._fslock(path)
try:
- blob_path = self._get_path(user, blob_id, namespace)
- except Exception as e:
- return defer.fail(e)
- if not os.path.isfile(blob_path):
- return defer.fail(BlobNotFound((user, blob_id)))
- with open(blob_path) as doc_file:
- doc_file.seek(-16, 2)
- tag = base64.urlsafe_b64encode(doc_file.read())
- return defer.succeed(tag)
+ yield fslock.deferUntilLocked()
+ with open(path) as doc_file:
+ doc_file.seek(-16, 2)
+ tag = base64.urlsafe_b64encode(doc_file.read())
+ defer.returnValue(tag)
+ finally:
+ fslock.unlock()
@defer.inlineCallbacks
def _get_disk_usage(self, start_path):
@@ -302,12 +338,15 @@ class FilesystemBlobsBackend(object):
raise Exception(err)
return desired_path
+ @defer.inlineCallbacks
def exists(self, user, blob_id, namespace):
+ path = self._get_path(user, blob_id, namespace)
+ fslock = self._fslock(path)
try:
- path = self._get_path(user, blob_id=blob_id, namespace=namespace)
- except Exception as e:
- return defer.fail(e)
- return os.path.isfile(path)
+ yield fslock.deferUntilLocked()
+ defer.returnValue(os.path.isfile(path))
+ finally:
+ fslock.unlock()
def _get_path(self, user, blob_id='', namespace=''):
parts = [user]
diff --git a/src/leap/soledad/server/interfaces.py b/src/leap/soledad/server/interfaces.py
index 96fa2f94..deab2027 100644
--- a/src/leap/soledad/server/interfaces.py
+++ b/src/leap/soledad/server/interfaces.py
@@ -23,6 +23,10 @@ class IBlobsBackend(Interface):
"""
An interface for a backend that can store blobs.
+
+ There might be concurrent calls to methods that modify the same blob, so
+ it's the backend implementation's responsibility to ensure isolation of
+ such actions.
"""
def read_blob(user, blob_id, consumer, namespace='', range=None):
@@ -66,8 +70,11 @@ class IBlobsBackend(Interface):
backend storage.
:rtype: twisted.internet.defer.Deferred
- :raise BlobExists: Raised when a blob with that id already exists.
- :raise QuotaExceeded: Raised when the quota for that user was exceeded.
+ :raise BlobExists: Raised (asynchronously) when a blob with that id
+ already exists.
+
+ :raise QuotaExceeded: Raised (asynchronously) when the quota for that
+ user was exceeded.
"""
def delete_blob(user, blob_id, namespace=''):
@@ -218,5 +225,7 @@ class IBlobsBackend(Interface):
:raise BlobNotFound: Raised (asynchronously) when the blob was not
found in the backend.
- :raise InvalidFlag: Raised when one of the flags passed is invalid.
+
+ :raise InvalidFlag: Raised (asynchronously) when one of the flags
+ passed is invalid.
"""