diff options
| -rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 23 | ||||
| -rw-r--r-- | tests/blobs/test_blob_manager.py | 3 | 
2 files changed, 19 insertions, 7 deletions
| diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 60c369b2..ab9de8dd 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -185,6 +185,7 @@ class BlobManager(object):      in local and remote storages.      """      max_retries = 3 +    concurrency_limit = 3      def __init__(              self, local_path, remote, key, secret, user, token=None, @@ -286,9 +287,17 @@ class BlobManager(object):          missing = [b_id for b_id in our_blobs if b_id not in server_blobs]          logger.info("Amount of documents missing on server: %s" % len(missing))          # TODO: Send concurrently when we are able to stream directly from db -        for blob_id in missing: -            fd = yield self.local.get(blob_id, namespace) +        while missing: +            deferreds = [] +            for _ in range(min(self.concurrency_limit, len(missing))): +                blob_id = missing.pop() +                deferreds.append(self.__send_one(blob_id, namespace)) +            yield defer.gatherResults(deferreds) + +    @defer.inlineCallbacks +    def __send_one(self, blob_id, namespace):              logger.info("Upload local blob: %s" % blob_id) +            fd = yield self.local.get(blob_id, namespace)              try:                  yield self._encrypt_and_upload(blob_id, fd)                  yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) @@ -316,9 +325,13 @@ class BlobManager(object):          docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]          logger.info("Fetching new docs from server: %s" % len(docs_we_want))          # TODO: Fetch concurrently when we are able to stream directly into db -        for blob_id in docs_we_want: -            logger.info("Fetching new doc: %s" % blob_id) -            yield self.get(blob_id, namespace) +        while docs_we_want: +            deferreds = [] +            for _ in range(min(self.concurrency_limit, len(docs_we_want))): +                blob_id = docs_we_want.pop() +                logger.info("Fetching new doc: %s" % blob_id) +                deferreds.append(self.get(blob_id, namespace)) +            yield defer.DeferredList(deferreds)      @defer.inlineCallbacks      def put(self, doc, size, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 7d985768..995a1989 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,7 +19,6 @@ Tests for BlobManager.  """  from twisted.trial import unittest  from twisted.internet import defer -from twisted.web.error import SchemeNotSupported  from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV  from leap.soledad.client._db.blobs import BlobAlreadyExistsError  from leap.soledad.client._db.blobs import SyncStatus @@ -170,7 +169,7 @@ class BlobManagerTestCase(unittest.TestCase):          with pytest.raises(Exception):              yield self.manager.put(doc1, len(content))          for _ in range(self.manager.max_retries + 1): -            with pytest.raises(SchemeNotSupported): +            with pytest.raises(defer.FirstError):                  yield self.manager.send_missing()          failed_upload = SyncStatus.FAILED_UPLOAD          local_list = yield self.manager.local_list(sync_status=failed_upload) | 
