diff options
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 55 | ||||
-rw-r--r-- | tests/blobs/test_blob_manager.py | 7 |
2 files changed, 41 insertions, 21 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index afc92864..73f011c2 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -327,18 +327,27 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_UPLOAD) missing = yield d - logger.info("Amount of documents missing on server: %s" % len(missing)) - while missing: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(missing))): - blob_id = missing.pop() - d = with_retry(self.__send_one, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(missing) + logger.info("Will send %d blobs to server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(total): + yield semaphore.acquire() + blob_id = missing.pop() + d = with_retry(self.__send_one, blob_id, namespace, i, total) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks - def __send_one(self, blob_id, namespace): - logger.info("Upload local blob: %s" % blob_id) + def __send_one(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) fd = yield self.local.get(blob_id, namespace=namespace) try: yield self._encrypt_and_upload(blob_id, fd) @@ -365,15 +374,23 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_DOWNLOAD) docs_we_want = yield d - logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - while docs_we_want: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(docs_we_want))): - blob_id = docs_we_want.pop() - logger.info("Fetching new doc: %s" % blob_id) - d = with_retry(self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(docs_we_want) + logger.info("Will fetch %d blobs from server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(len(docs_we_want)): + yield semaphore.acquire() + blob_id = docs_we_want.pop() + logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) + d = with_retry(self.get, blob_id, namespace) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks def sync(self, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 81379c73..c6f84e29 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,6 +19,7 @@ Tests for BlobManager. """ from twisted.trial import unittest from twisted.internet import defer +from twisted.web.error import SchemeNotSupported from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import SyncStatus @@ -154,7 +155,8 @@ class BlobManagerTestCase(unittest.TestCase): self.manager._encrypt_and_upload = Mock(return_value=upload_failure) content, blob_id = "Blob content", uuid4().hex doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): + with pytest.raises(SchemeNotSupported): + # should fail because manager URL is invalid yield self.manager.put(doc1, len(content)) pending_upload = SyncStatus.PENDING_UPLOAD local_list = yield self.manager.local_list(sync_status=pending_upload) @@ -166,7 +168,8 @@ class BlobManagerTestCase(unittest.TestCase): self.manager.remote_list = Mock(return_value=[]) content, blob_id = "Blob content", uuid4().hex doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): + with pytest.raises(SchemeNotSupported): + # should fail because manager URL is invalid yield self.manager.put(doc1, len(content)) for _ in range(self.manager.max_retries + 1): with pytest.raises(defer.FirstError): |