From d7894eddd3a1c5241c4a79f58c227a7d30610b9a Mon Sep 17 00:00:00 2001 From: drebs Date: Tue, 5 Dec 2017 11:48:20 -0200 Subject: [bug] transfer maximum allowed number of blobs -- Closes: #9004 --- src/leap/soledad/client/_db/blobs/__init__.py | 10 ++-- src/leap/soledad/client/_db/blobs/sync.py | 81 +++++++++++++++------------ 2 files changed, 48 insertions(+), 43 deletions(-) (limited to 'src/leap/soledad') diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py index 86e45381..95814d46 100644 --- a/src/leap/soledad/client/_db/blobs/__init__.py +++ b/src/leap/soledad/client/_db/blobs/__init__.py @@ -309,17 +309,15 @@ class BlobManager(BlobsSynchronizer): yield self.local.update_sync_status( doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace, priority=priority) - yield self._send(doc.blob_id, namespace, 1, 1) + yield self._send(doc.blob_id, namespace) - def _send(self, blob_id, namespace, i, total): + def _send(self, blob_id, namespace): lock = self.locks[blob_id] - d = lock.run(self.__send, blob_id, namespace, i, total) + d = lock.run(self.__send, blob_id, namespace) return d @defer.inlineCallbacks - def __send(self, blob_id, namespace, i, total): - logger.info("Sending blob to server (%d/%d): %s" - % (i, total, blob_id)) + def __send(self, blob_id, namespace): # In fact, some kind of pipe is needed here, where each write on db # handle gets forwarded into a write on the connection handle fd = yield self.local.get(blob_id, namespace=namespace) diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py index e6397ede..9309d9da 100644 --- a/src/leap/soledad/client/_db/blobs/sync.py +++ b/src/leap/soledad/client/_db/blobs/sync.py @@ -106,29 +106,33 @@ class BlobsSynchronizer(object): @defer.inlineCallbacks def _send_missing(self, namespace): + # the list of priorities must be refreshed every time a new blob will + # be transferred. To do that, we use a semaphore and get a new ordered + # list only when there are free slots for new transfers. max_transfers = self.concurrent_transfers_limit semaphore = defer.DeferredSemaphore(max_transfers) - # the list of blobs should be refreshed often, so we run as many - # concurrent transfers as we can and then refresh the list + scheduled = set() while True: - d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace) - missing = yield d - - if not missing: + d = semaphore.run(self._send_next, namespace, scheduled) + success = yield d + if not success: break - total = len(missing) - now = min(total, max_transfers) - logger.info("There are %d pending blob uploads." % total) - logger.info("Will send %d blobs to server now." % now) - missing = missing[:now] - deferreds = [] - for i in xrange(now): - blob_id = missing.pop(0) - d = semaphore.run( - with_retry, self._send, blob_id, namespace, i, total) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + @defer.inlineCallbacks + def _send_next(self, namespace, scheduled): + status = SyncStatus.PENDING_UPLOAD + pending = yield self.local_list_status(status, namespace) + pending = [x for x in pending if x not in scheduled] + logger.info("There are %d pending blob uploads." % len(pending)) + + if not pending: + # we are finished, indicate that to our caller + defer.returnValue(False) + + blob_id = pending[0] + logger.info("Sending blob: %s" % (blob_id,)) + yield with_retry(self._send, blob_id, namespace) + defer.returnValue(True) def fetch_missing(self, namespace=''): """ @@ -149,30 +153,33 @@ class BlobsSynchronizer(object): @defer.inlineCallbacks def _fetch_missing(self, namespace=''): + # the list of priorities must be refreshed every time a new blob will + # be transferred. To do that, we use a semaphore and get a new ordered + # list only when there are free slots for new transfers. max_transfers = self.concurrent_transfers_limit semaphore = defer.DeferredSemaphore(max_transfers) - # in order to make sure that transfer priorities will be met, the list - # of blobs to transfer should be refreshed often. What we do is run as - # many concurrent transfers as we can and then refresh the list + scheduled = set() while True: - d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) - docs_we_want = yield d - - if not docs_we_want: + d = semaphore.run(self._fetch_next, namespace, scheduled) + success = yield d + if not success: break - total = len(docs_we_want) - now = min(total, max_transfers) - logger.info("There are %d pending blob downloads." % total) - logger.info("Will fetch %d blobs from server now." % now) - docs_we_want = docs_we_want[:now] - deferreds = [] - for i in xrange(now): - blob_id = docs_we_want.pop(0) - logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id)) - d = semaphore.run(with_retry, self._fetch, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + @defer.inlineCallbacks + def _fetch_next(self, namespace, scheduled): + status = SyncStatus.PENDING_DOWNLOAD + pending = yield self.local_list_status(status, namespace) + pending = [x for x in pending if x not in scheduled] + logger.info("There are %d pending blob downloads." % len(pending)) + + if not pending: + # we are finished, indicate that to our caller + defer.returnValue(False) + + blob_id = pending[0] + logger.info("Fetching blob: %s" % (blob_id,)) + yield with_retry(self._fetch, blob_id, namespace) + defer.returnValue(True) @defer.inlineCallbacks def sync(self, namespace=''): -- cgit v1.2.3