From 30a707786dfe9fb68855fad489817efed0d0b329 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Sep 2017 17:36:36 -0300 Subject: [feature] concurrent blob download/upload -- Related: #8932 --- src/leap/soledad/client/_db/blobs.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) (limited to 'src/leap/soledad/client') diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 60c369b2..ab9de8dd 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -185,6 +185,7 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 + concurrency_limit = 3 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -286,9 +287,17 @@ class BlobManager(object): missing = [b_id for b_id in our_blobs if b_id not in server_blobs] logger.info("Amount of documents missing on server: %s" % len(missing)) # TODO: Send concurrently when we are able to stream directly from db - for blob_id in missing: - fd = yield self.local.get(blob_id, namespace) + while missing: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(missing))): + blob_id = missing.pop() + deferreds.append(self.__send_one(blob_id, namespace)) + yield defer.gatherResults(deferreds) + + @defer.inlineCallbacks + def __send_one(self, blob_id, namespace): logger.info("Upload local blob: %s" % blob_id) + fd = yield self.local.get(blob_id, namespace) try: yield self._encrypt_and_upload(blob_id, fd) yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) @@ -316,9 +325,13 @@ class BlobManager(object): docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] logger.info("Fetching new docs from server: %s" % len(docs_we_want)) # TODO: Fetch concurrently when we are able to stream directly into db - for blob_id in docs_we_want: - logger.info("Fetching new doc: %s" % blob_id) - yield self.get(blob_id, namespace) + while docs_we_want: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(docs_we_want))): + blob_id = docs_we_want.pop() + logger.info("Fetching new doc: %s" % blob_id) + deferreds.append(self.get(blob_id, namespace)) + yield defer.DeferredList(deferreds) @defer.inlineCallbacks def put(self, doc, size, namespace=''): -- cgit v1.2.3