diff options
author | Victor Shyba <victor1984@riseup.net> | 2017-09-15 17:36:36 -0300 |
---|---|---|
committer | Victor Shyba <victor1984@riseup.net> | 2017-10-05 05:41:40 -0300 |
commit | 30a707786dfe9fb68855fad489817efed0d0b329 (patch) | |
tree | 8b15aff5eae7485e508d0fcab3e3d466c60a93c9 /src/leap | |
parent | c442a81a451543d4cbbdee44d9711ac47cc91f56 (diff) |
[feature] concurrent blob download/upload
-- Related: #8932
Diffstat (limited to 'src/leap')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 60c369b2..ab9de8dd 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -185,6 +185,7 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 + concurrency_limit = 3 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -286,9 +287,17 @@ class BlobManager(object): missing = [b_id for b_id in our_blobs if b_id not in server_blobs] logger.info("Amount of documents missing on server: %s" % len(missing)) # TODO: Send concurrently when we are able to stream directly from db - for blob_id in missing: - fd = yield self.local.get(blob_id, namespace) + while missing: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(missing))): + blob_id = missing.pop() + deferreds.append(self.__send_one(blob_id, namespace)) + yield defer.gatherResults(deferreds) + + @defer.inlineCallbacks + def __send_one(self, blob_id, namespace): logger.info("Upload local blob: %s" % blob_id) + fd = yield self.local.get(blob_id, namespace) try: yield self._encrypt_and_upload(blob_id, fd) yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) @@ -316,9 +325,13 @@ class BlobManager(object): docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] logger.info("Fetching new docs from server: %s" % len(docs_we_want)) # TODO: Fetch concurrently when we are able to stream directly into db - for blob_id in docs_we_want: - logger.info("Fetching new doc: %s" % blob_id) - yield self.get(blob_id, namespace) + while docs_we_want: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(docs_we_want))): + blob_id = docs_we_want.pop() + logger.info("Fetching new doc: %s" % blob_id) + deferreds.append(self.get(blob_id, namespace)) + yield defer.DeferredList(deferreds) @defer.inlineCallbacks def put(self, doc, size, namespace=''): |