summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_db/blobs.py25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 4edb77f4..e01078e4 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -220,7 +220,8 @@ class BlobManager(object):
in local and remote storages.
"""
max_retries = 3
- concurrency_limit = 3
+ concurrent_transfers_limit = 3
+ concurrent_writes_limit = 100
def __init__(
self, local_path, remote, key, secret, user, token=None,
@@ -248,6 +249,7 @@ class BlobManager(object):
self.secret = secret
self.user = user
self._client = HTTPClient(user, token, cert_file)
+ self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit)
def close(self):
if hasattr(self, 'local') and self.local:
@@ -334,7 +336,7 @@ class BlobManager(object):
total = len(missing)
logger.info("Will send %d blobs to server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -383,7 +385,7 @@ class BlobManager(object):
total = len(docs_we_want)
logger.info("Will fetch %d blobs from server." % total)
deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+ semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
def release(result):
semaphore.release()
@@ -407,7 +409,6 @@ class BlobManager(object):
except defer.FirstError as e:
e.subFailure.raiseException()
- @defer.inlineCallbacks
def put(self, doc, size, namespace=''):
"""
Put a blob in local storage and upload it to server.
@@ -420,6 +421,10 @@ class BlobManager(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
+ return self.semaphore.run(self._put, doc, size, namespace)
+
+ @defer.inlineCallbacks
+ def _put(self, doc, size, namespace):
if (yield self.local.exists(doc.blob_id, namespace=namespace)):
error_message = "Blob already exists: %s" % doc.blob_id
raise BlobAlreadyExistsError(error_message)
@@ -433,7 +438,6 @@ class BlobManager(object):
yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace)
yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED)
- @defer.inlineCallbacks
def set_flags(self, blob_id, flags, namespace=''):
"""
Set flags for a given blob_id.
@@ -450,6 +454,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._set_flags, blob_id, flags, namespace)
+
+ @defer.inlineCallbacks
+ def _set_flags(self, blob_id, flags, namespace):
params = {'namespace': namespace} if namespace else None
flagsfd = BytesIO(json.dumps(flags))
uri = urljoin(self.remote, self.user + "/" + blob_id)
@@ -584,7 +592,6 @@ class BlobManager(object):
logger.info("Finished download: (%s, %d)" % (blob_id, size))
defer.returnValue((fd, size))
- @defer.inlineCallbacks
def delete(self, blob_id, namespace=''):
"""
Delete a blob from local and remote storages.
@@ -598,6 +605,10 @@ class BlobManager(object):
:return: A deferred that fires when the operation finishes.
:rtype: twisted.internet.defer.Deferred
"""
+ return self.semaphore.run(self._delete, blob_id, namespace)
+
+ @defer.inlineCallbacks
+ def _delete(self, blob_id, namespace):
logger.info("Staring deletion of blob: %s" % blob_id)
yield self._delete_from_remote(blob_id, namespace=namespace)
if (yield self.local.exists(blob_id, namespace=namespace)):
@@ -615,6 +626,8 @@ class BlobManager(object):
class SQLiteBlobBackend(object):
+ concurrency_limit = 10
+
def __init__(self, path, key=None, user=None):
dbname = '%s_blobs.db' % (user or 'soledad')
self.path = os.path.abspath(