diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 55 |
1 files changed, 36 insertions, 19 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index afc92864..73f011c2 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -327,18 +327,27 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_UPLOAD) missing = yield d - logger.info("Amount of documents missing on server: %s" % len(missing)) - while missing: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(missing))): - blob_id = missing.pop() - d = with_retry(self.__send_one, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(missing) + logger.info("Will send %d blobs to server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(total): + yield semaphore.acquire() + blob_id = missing.pop() + d = with_retry(self.__send_one, blob_id, namespace, i, total) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks - def __send_one(self, blob_id, namespace): - logger.info("Upload local blob: %s" % blob_id) + def __send_one(self, blob_id, namespace, i, total): + logger.info("Sending blob to server (%d/%d): %s" + % (i, total, blob_id)) fd = yield self.local.get(blob_id, namespace=namespace) try: yield self._encrypt_and_upload(blob_id, fd) @@ -365,15 +374,23 @@ class BlobManager(object): d = self.local_list(namespace=namespace, sync_status=SyncStatus.PENDING_DOWNLOAD) docs_we_want = yield d - logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - while docs_we_want: - deferreds = [] - for _ in range(min(self.concurrency_limit, len(docs_we_want))): - blob_id = docs_we_want.pop() - logger.info("Fetching new doc: %s" % blob_id) - d = with_retry(self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds) + total = len(docs_we_want) + logger.info("Will fetch %d blobs from server." % total) + deferreds = [] + semaphore = defer.DeferredSemaphore(self.concurrency_limit) + + def release(result): + semaphore.release() + return result + + for i in xrange(len(docs_we_want)): + yield semaphore.acquire() + blob_id = docs_we_want.pop() + logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) + d = with_retry(self.get, blob_id, namespace) + d.addCallbacks(release, release) + deferreds.append(d) + yield defer.gatherResults(deferreds) @defer.inlineCallbacks def sync(self, namespace=''): |