diff options
author | drebs <drebs@riseup.net> | 2017-10-04 15:42:29 -0300 |
---|---|---|
committer | drebs <drebs@riseup.net> | 2017-10-05 09:43:40 -0300 |
commit | 64a08c836374bceb6e3d0813fb918dfd3f570852 (patch) | |
tree | 0e62c0d8d9d8df70a4eb9245902e4fe04cdeb55c /src/leap/soledad/client/_db | |
parent | 7ce706979c7f4d7bac4c26c026c6573b0c61bba3 (diff) |
[bug] ensure maximum concurrency on blobs transfer
The way in that concurrency limit was being enforced was such that
transfer attempts were being spawned in groups of 3, and all of them had
to finish before a new group could be spawned. This modification allows
for use of maximum concurrency level at all times.
Diffstat (limited to 'src/leap/soledad/client/_db')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 55 |
1 files changed, 36 insertions, 19 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index afc92864..73f011c2 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -327,18 +327,27 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_UPLOAD) missing = yield d - logger.info("Amount of documents missing on server: %s" % len(missing)) - while missing: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(missing))): - blob_id = missing.pop() - d = with_retry(self.__send_one, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(missing) + logger.info("Will send %d blobs to server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(total): + yield semaphore.acquire() + blob_id = missing.pop() + d = with_retry(self.__send_one, blob_id, namespace, i, total) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks - def __send_one(self, blob_id, namespace): - logger.info("Upload local blob: %s" % blob_id) + def __send_one(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) fd = yield self.local.get(blob_id, namespace=namespace) try: yield self._encrypt_and_upload(blob_id, fd) @@ -365,15 +374,23 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_DOWNLOAD) docs_we_want = yield d - logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - while docs_we_want: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(docs_we_want))): - blob_id = docs_we_want.pop() - logger.info("Fetching new doc: %s" % blob_id) - d = with_retry(self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(docs_we_want) + logger.info("Will fetch %d blobs from server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(len(docs_we_want)): + yield semaphore.acquire() + blob_id = docs_we_want.pop() + logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) + d = with_retry(self.get, blob_id, namespace) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks def sync(self, namespace=''): |