summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_db/blobs.py55
-rw-r--r--tests/blobs/test_blob_manager.py7
2 files changed, 41 insertions, 21 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index afc92864..73f011c2 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -327,18 +327,27 @@ class BlobManager(object):
d = self.local_list(namespace=namespace,
sync_status=SyncStatus.PENDING_UPLOAD)
missing = yield d
- logger.info("Amount of documents missing on server: %s" % len(missing))
- while missing:
- deferreds = []
- for _ in range(min(self.concurrency_limit, len(missing))):
- blob_id = missing.pop()
- d = with_retry(self.__send_one, blob_id, namespace)
- deferreds.append(d)
- yield defer.gatherResults(deferreds)
+ total = len(missing)
+ logger.info("Will send %d blobs to server." % total)
+ deferreds = []
+ semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+
+ def release(result):
+ semaphore.release()
+ return result
+
+ for i in xrange(total):
+ yield semaphore.acquire()
+ blob_id = missing.pop()
+ d = with_retry(self.__send_one, blob_id, namespace, i, total)
+ d.addCallbacks(release, release)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds)
@defer.inlineCallbacks
- def __send_one(self, blob_id, namespace):
- logger.info("Upload local blob: %s" % blob_id)
+ def __send_one(self, blob_id, namespace, i, total):
+ logger.info("Sending blob to server (%d/%d): %s"
+ % (i, total, blob_id))
fd = yield self.local.get(blob_id, namespace=namespace)
try:
yield self._encrypt_and_upload(blob_id, fd)
@@ -365,15 +374,23 @@ class BlobManager(object):
d = self.local_list(namespace=namespace,
sync_status=SyncStatus.PENDING_DOWNLOAD)
docs_we_want = yield d
- logger.info("Fetching new docs from server: %s" % len(docs_we_want))
- while docs_we_want:
- deferreds = []
- for _ in range(min(self.concurrency_limit, len(docs_we_want))):
- blob_id = docs_we_want.pop()
- logger.info("Fetching new doc: %s" % blob_id)
- d = with_retry(self.get, blob_id, namespace)
- deferreds.append(d)
- yield defer.gatherResults(deferreds)
+ total = len(docs_we_want)
+ logger.info("Will fetch %d blobs from server." % total)
+ deferreds = []
+ semaphore = defer.DeferredSemaphore(self.concurrency_limit)
+
+ def release(result):
+ semaphore.release()
+ return result
+
+ for i in xrange(len(docs_we_want)):
+ yield semaphore.acquire()
+ blob_id = docs_we_want.pop()
+ logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id))
+ d = with_retry(self.get, blob_id, namespace)
+ d.addCallbacks(release, release)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds)
@defer.inlineCallbacks
def sync(self, namespace=''):
diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py
index 81379c73..c6f84e29 100644
--- a/tests/blobs/test_blob_manager.py
+++ b/tests/blobs/test_blob_manager.py
@@ -19,6 +19,7 @@ Tests for BlobManager.
"""
from twisted.trial import unittest
from twisted.internet import defer
+from twisted.web.error import SchemeNotSupported
from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV
from leap.soledad.client._db.blobs import BlobAlreadyExistsError
from leap.soledad.client._db.blobs import SyncStatus
@@ -154,7 +155,8 @@ class BlobManagerTestCase(unittest.TestCase):
self.manager._encrypt_and_upload = Mock(return_value=upload_failure)
content, blob_id = "Blob content", uuid4().hex
doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(Exception):
+ with pytest.raises(SchemeNotSupported):
+ # should fail because manager URL is invalid
yield self.manager.put(doc1, len(content))
pending_upload = SyncStatus.PENDING_UPLOAD
local_list = yield self.manager.local_list(sync_status=pending_upload)
@@ -166,7 +168,8 @@ class BlobManagerTestCase(unittest.TestCase):
self.manager.remote_list = Mock(return_value=[])
content, blob_id = "Blob content", uuid4().hex
doc1 = BlobDoc(BytesIO(content), blob_id)
- with pytest.raises(Exception):
+ with pytest.raises(SchemeNotSupported):
+ # should fail because manager URL is invalid
yield self.manager.put(doc1, len(content))
for _ in range(self.manager.max_retries + 1):
with pytest.raises(defer.FirstError):