From 30a707786dfe9fb68855fad489817efed0d0b329 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Sep 2017 17:36:36 -0300 Subject: [feature] concurrent blob download/upload -- Related: #8932 --- src/leap/soledad/client/_db/blobs.py | 23 ++++++++++++++++++----- tests/blobs/test_blob_manager.py | 3 +-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 60c369b2..ab9de8dd 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -185,6 +185,7 @@ class BlobManager(object): in local and remote storages. """ max_retries = 3 + concurrency_limit = 3 def __init__( self, local_path, remote, key, secret, user, token=None, @@ -286,9 +287,17 @@ class BlobManager(object): missing = [b_id for b_id in our_blobs if b_id not in server_blobs] logger.info("Amount of documents missing on server: %s" % len(missing)) # TODO: Send concurrently when we are able to stream directly from db - for blob_id in missing: - fd = yield self.local.get(blob_id, namespace) + while missing: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(missing))): + blob_id = missing.pop() + deferreds.append(self.__send_one(blob_id, namespace)) + yield defer.gatherResults(deferreds) + + @defer.inlineCallbacks + def __send_one(self, blob_id, namespace): logger.info("Upload local blob: %s" % blob_id) + fd = yield self.local.get(blob_id, namespace) try: yield self._encrypt_and_upload(blob_id, fd) yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) @@ -316,9 +325,13 @@ class BlobManager(object): docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] logger.info("Fetching new docs from server: %s" % len(docs_we_want)) # TODO: Fetch concurrently when we are able to stream directly into db - for blob_id in docs_we_want: - logger.info("Fetching new doc: %s" % blob_id) - yield self.get(blob_id, namespace) + while docs_we_want: + deferreds = [] + for _ in range(min(self.concurrency_limit, len(docs_we_want))): + blob_id = docs_we_want.pop() + logger.info("Fetching new doc: %s" % blob_id) + deferreds.append(self.get(blob_id, namespace)) + yield defer.DeferredList(deferreds) @defer.inlineCallbacks def put(self, doc, size, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 7d985768..995a1989 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,7 +19,6 @@ Tests for BlobManager. """ from twisted.trial import unittest from twisted.internet import defer -from twisted.web.error import SchemeNotSupported from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import SyncStatus @@ -170,7 +169,7 @@ class BlobManagerTestCase(unittest.TestCase): with pytest.raises(Exception): yield self.manager.put(doc1, len(content)) for _ in range(self.manager.max_retries + 1): - with pytest.raises(SchemeNotSupported): + with pytest.raises(defer.FirstError): yield self.manager.send_missing() failed_upload = SyncStatus.FAILED_UPLOAD local_list = yield self.manager.local_list(sync_status=failed_upload) -- cgit v1.2.3