diff options
Diffstat (limited to 'src/leap/soledad')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 4edb77f4..e01078e4 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -220,7 +220,8 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 - concurrency_limit = 3 + concurrent_transfers_limit = 3 + concurrent_writes_limit = 100 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -248,6 +249,7 @@ class BlobManager(object): self.secret = secret self.user = user self._client = HTTPClient(user, token, cert_file) + self.semaphore = defer.DeferredSemaphore(self.concurrent_writes_limit) def close(self): if hasattr(self, 'local') and self.local: @@ -334,7 +336,7 @@ class BlobManager(object): total = len(missing) logger.info("Will send %d blobs to server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -383,7 +385,7 @@ class BlobManager(object): total = len(docs_we_want) logger.info("Will fetch %d blobs from server." % total) deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrency_limit) + semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) def release(result): semaphore.release() @@ -407,7 +409,6 @@ class BlobManager(object): except defer.FirstError as e: e.subFailure.raiseException() - @defer.inlineCallbacks def put(self, doc, size, namespace=''): """ Put a blob in local storage and upload it to server. @@ -420,6 +421,10 @@ class BlobManager(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ + return self.semaphore.run(self._put, doc, size, namespace) + + @defer.inlineCallbacks + def _put(self, doc, size, namespace): if (yield self.local.exists(doc.blob_id, namespace=namespace)): error_message = "Blob already exists: %s" % doc.blob_id raise BlobAlreadyExistsError(error_message) @@ -433,7 +438,6 @@ class BlobManager(object): yield self._encrypt_and_upload(doc.blob_id, fd, namespace=namespace) yield self.local.update_sync_status(doc.blob_id, SyncStatus.SYNCED) - @defer.inlineCallbacks def set_flags(self, blob_id, flags, namespace=''): """ Set flags for a given blob_id. @@ -450,6 +454,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._set_flags, blob_id, flags, namespace) + + @defer.inlineCallbacks + def _set_flags(self, blob_id, flags, namespace): params = {'namespace': namespace} if namespace else None flagsfd = BytesIO(json.dumps(flags)) uri = urljoin(self.remote, self.user + "/" + blob_id) @@ -584,7 +592,6 @@ class BlobManager(object): logger.info("Finished download: (%s, %d)" % (blob_id, size)) defer.returnValue((fd, size)) - @defer.inlineCallbacks def delete(self, blob_id, namespace=''): """ Delete a blob from local and remote storages. @@ -598,6 +605,10 @@ class BlobManager(object): :return: A deferred that fires when the operation finishes. :rtype: twisted.internet.defer.Deferred """ + return self.semaphore.run(self._delete, blob_id, namespace) + + @defer.inlineCallbacks + def _delete(self, blob_id, namespace): logger.info("Staring deletion of blob: %s" % blob_id) yield self._delete_from_remote(blob_id, namespace=namespace) if (yield self.local.exists(blob_id, namespace=namespace)): @@ -615,6 +626,8 @@ class BlobManager(object): class SQLiteBlobBackend(object): + concurrency_limit = 10 + def __init__(self, path, key=None, user=None): dbname = '%s_blobs.db' % (user or 'soledad') self.path = os.path.abspath( |