From 64a08c836374bceb6e3d0813fb918dfd3f570852 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 4 Oct 2017 15:42:29 -0300 Subject: [bug] ensure maximum concurrency on blobs transfer The way in that concurrency limit was being enforced was such that transfer attempts were being spawned in groups of 3, and all of them had to finish before a new group could be spawned. This modification allows for use of maximum concurrency level at all times. --- src/leap/soledad/client/_db/blobs.py | 55 +++++++++++++++++++++++------------- tests/blobs/test_blob_manager.py | 7 +++-- 2 files changed, 41 insertions(+), 21 deletions(-) diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index afc92864..73f011c2 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -327,18 +327,27 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_UPLOAD) missing = yield d - logger.info("Amount of documents missing on server: %s" % len(missing)) - while missing: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(missing))): - blob_id = missing.pop() - d = with_retry(self.__send_one, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(missing) + logger.info("Will send %d blobs to server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(total): + yield semaphore.acquire() + blob_id = missing.pop() + d = with_retry(self.__send_one, blob_id, namespace, i, total) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks - def __send_one(self, blob_id, namespace): - logger.info("Upload local blob: %s" % blob_id) + def __send_one(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) fd = yield self.local.get(blob_id, namespace=namespace) try: yield self._encrypt_and_upload(blob_id, fd) @@ -365,15 +374,23 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_DOWNLOAD) docs_we_want = yield d - logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - while docs_we_want: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(docs_we_want))): - blob_id = docs_we_want.pop() - logger.info("Fetching new doc: %s" % blob_id) - d = with_retry(self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(docs_we_want) + logger.info("Will fetch %d blobs from server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(len(docs_we_want)): + yield semaphore.acquire() + blob_id = docs_we_want.pop() + logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) + d = with_retry(self.get, blob_id, namespace) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks def sync(self, namespace=''): diff --git a/tests/blobs/test_blob_manager.py b/tests/blobs/test_blob_manager.py index 81379c73..c6f84e29 100644 --- a/tests/blobs/test_blob_manager.py +++ b/tests/blobs/test_blob_manager.py @@ -19,6 +19,7 @@ Tests for BlobManager. """ from twisted.trial import unittest from twisted.internet import defer +from twisted.web.error import SchemeNotSupported from leap.soledad.client._db.blobs import BlobManager, BlobDoc, FIXED_REV from leap.soledad.client._db.blobs import BlobAlreadyExistsError from leap.soledad.client._db.blobs import SyncStatus @@ -154,7 +155,8 @@ class BlobManagerTestCase(unittest.TestCase): self.manager._encrypt_and_upload = Mock(return_value=upload_failure) content, blob_id = "Blob content", uuid4().hex doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): + with pytest.raises(SchemeNotSupported): + # should fail because manager URL is invalid yield self.manager.put(doc1, len(content)) pending_upload = SyncStatus.PENDING_UPLOAD local_list = yield self.manager.local_list(sync_status=pending_upload) @@ -166,7 +168,8 @@ class BlobManagerTestCase(unittest.TestCase): self.manager.remote_list = Mock(return_value=[]) content, blob_id = "Blob content", uuid4().hex doc1 = BlobDoc(BytesIO(content), blob_id) - with pytest.raises(Exception): + with pytest.raises(SchemeNotSupported): + # should fail because manager URL is invalid yield self.manager.put(doc1, len(content)) for _ in range(self.manager.max_retries + 1): with pytest.raises(defer.FirstError): -- cgit v1.2.3