summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor1984@riseup.net>2017-09-15 17:36:36 -0300
committerVictor Shyba <victor1984@riseup.net>2017-10-05 05:41:40 -0300
commit30a707786dfe9fb68855fad489817efed0d0b329 (patch)
tree8b15aff5eae7485e508d0fcab3e3d466c60a93c9
parentc442a81a451543d4cbbdee44d9711ac47cc91f56 (diff)
[feature] concurrent blob download/upload
-- Related: #8932
-rw-r--r--src/leap/soledad/client/_db/blobs.py23
-rw-r--r--tests/blobs/test_blob_manager.py3
2 files changed, 19 insertions, 7 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 60c369b2..ab9de8dd 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -185,6 +185,7 @@ class BlobManager(object):
in local and remote storages.
"""
max_retries = 3
+ concurrency_limit = 3
def __init__(
self, local_path, remote, key, secret, user, token=None,
@@ -286,9 +287,17 @@ class BlobManager(object):
missing = [b_id for b_id in our_blobs if b_id not in server_blobs]
logger.info("Amount of documents missing on server: %s" % len(missing))
# TODO: Send concurrently when we are able to stream directly from db
- for blob_id in missing:
- fd = yield self.local.get(blob_id, namespace)
+ while missing:
+ deferreds = []
+ for _ in range(min(self.concurrency_limit, len(missing))):
+ blob_id = missing.pop()
+ deferreds.append(self.__send_one(blob_id, namespace))
+ yield defer.gatherResults(deferreds)
+
+ @defer.inlineCallbacks
+ def __send_one(self, blob_id, namespace):
logger.info("Upload local blob: %s" % blob_id)
+ fd = yield self.local.get(blob_id, namespace)
try:
yield self._encrypt_and_upload(blob_id, fd)
yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
@@ -316,9 +325,13 @@ class BlobManager(object):
docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]
logger.info("Fetching new docs from server: %s" % len(docs_we_want))
# TODO: Fetch concurrently when we are able to stream directly into db
- for blob_id in docs_we_want:
- logger.info("Fetching new doc: %s" % blob_id)
- yield self.get(blob_id, namespace)
+ while docs_we_want:
+ deferreds = []
+ for _ in range(min(self.concurrency_limit, len(docs_we_want))):
+ blob_id = docs_we_want.pop()
+ logger.info("Fetching new doc: %s" % blob_id)
+ deferreds.append(self.get(blob_id, namespace))
+ yield defer.DeferredList(deferreds)
@defer.inlineCallbacks
def put(self, doc, size, namespace=''):
diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py
index 7d985768..995a1989 100644
--- a/tests/blobs/test_blob_manager.py
+++ b/tests/blobs/test_blob_manager.py
@@ -19,7 +19,6 @@ Tests for BlobManager.
"""
from twisted.trial import unittest
from twisted.internet import defer
-from twisted.web.error import SchemeNotSupported
from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
from leap.soledad.client._db.blobs import SyncStatus
@@ -170,7 +169,7 @@ class BlobManagerTestCase(unittest.TestCase):
with pytest.raises(Exception):
yield self.manager.put(doc1, len(content))
for _ in range(self.manager.max_retries + 1):
- with pytest.raises(SchemeNotSupported):
+ with pytest.raises(defer.FirstError):
yield self.manager.send_missing()
failed_upload = SyncStatus.FAILED_UPLOAD
local_list = yield self.manager.local_list(sync_status=failed_upload)