From 1d8b32abc821116da4f3b4f192bc06a931d30076 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 29 Nov 2017 11:42:29 -0200 Subject: [feature] add priorities for blob transfers Closes: #8691 --- src/leap/soledad/client/_db/blobs/sync.py | 106 ++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 35 deletions(-) (limited to 'src/leap/soledad/client/_db/blobs/sync.py') diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py index 3ee60305..e6397ede 100644 --- a/src/leap/soledad/client/_db/blobs/sync.py +++ b/src/leap/soledad/client/_db/blobs/sync.py @@ -17,12 +17,15 @@ """ Synchronization between blobs client/server """ +from collections import defaultdict from twisted.internet import defer from twisted.internet import reactor from twisted.logger import Logger from twisted.internet import error from .sql import SyncStatus from .errors import RetriableTransferError + + logger = Logger() @@ -56,6 +59,9 @@ def with_retry(func, *args, **kwargs): class BlobsSynchronizer(object): + def __init__(self): + self.locks = defaultdict(defer.DeferredLock) + @defer.inlineCallbacks def refresh_sync_status_from_server(self, namespace=''): d1 = self.remote_list(namespace=namespace) @@ -82,7 +88,6 @@ class BlobsSynchronizer(object): SyncStatus.SYNCED, namespace=namespace) - @defer.inlineCallbacks def send_missing(self, namespace=''): """ Compare local and remote blobs and send what's missing in server. @@ -90,30 +95,41 @@ class BlobsSynchronizer(object): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when all local blobs were sent to + server. + :rtype: twisted.internet.defer.Deferred """ - missing = yield self.local.list_status( - SyncStatus.PENDING_UPLOAD, namespace) - total = len(missing) - logger.info("Will send %d blobs to server." % total) - deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) - - for i in xrange(total): - blob_id = missing.pop() - d = semaphore.run( - with_retry, self.__send_one, blob_id, namespace, i, total) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + lock = self.locks['send_missing'] + d = lock.run(self._send_missing, namespace) + return d @defer.inlineCallbacks - def __send_one(self, blob_id, namespace, i, total): - logger.info("Sending blob to server (%d/%d): %s" - % (i, total, blob_id)) - fd = yield self.local.get(blob_id, namespace=namespace) - yield self._encrypt_and_upload(blob_id, fd) - yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED) + def _send_missing(self, namespace): + max_transfers = self.concurrent_transfers_limit + semaphore = defer.DeferredSemaphore(max_transfers) + # the list of blobs should be refreshed often, so we run as many + # concurrent transfers as we can and then refresh the list + while True: + d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace) + missing = yield d + + if not missing: + break + + total = len(missing) + now = min(total, max_transfers) + logger.info("There are %d pending blob uploads." % total) + logger.info("Will send %d blobs to server now." % now) + missing = missing[:now] + deferreds = [] + for i in xrange(now): + blob_id = missing.pop(0) + d = semaphore.run( + with_retry, self._send, blob_id, namespace, i, total) + deferreds.append(d) + yield defer.gatherResults(deferreds, consumeErrors=True) - @defer.inlineCallbacks def fetch_missing(self, namespace=''): """ Compare local and remote blobs and fetch what's missing in local @@ -122,21 +138,41 @@ class BlobsSynchronizer(object): :param namespace: Optional parameter to restrict operation to a given namespace. :type namespace: str + + :return: A deferred that fires when all remote blobs were received from + server. + :rtype: twisted.internet.defer.Deferred """ - # TODO: Use something to prioritize user requests over general new docs - d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) - docs_we_want = yield d - total = len(docs_we_want) - logger.info("Will fetch %d blobs from server." % total) - deferreds = [] - semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit) - - for i in xrange(len(docs_we_want)): - blob_id = docs_we_want.pop() - logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id)) - d = semaphore.run(with_retry, self.get, blob_id, namespace) - deferreds.append(d) - yield defer.gatherResults(deferreds, consumeErrors=True) + lock = self.locks['fetch_missing'] + d = lock.run(self._fetch_missing, namespace) + return d + + @defer.inlineCallbacks + def _fetch_missing(self, namespace=''): + max_transfers = self.concurrent_transfers_limit + semaphore = defer.DeferredSemaphore(max_transfers) + # in order to make sure that transfer priorities will be met, the list + # of blobs to transfer should be refreshed often. What we do is run as + # many concurrent transfers as we can and then refresh the list + while True: + d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace) + docs_we_want = yield d + + if not docs_we_want: + break + + total = len(docs_we_want) + now = min(total, max_transfers) + logger.info("There are %d pending blob downloads." % total) + logger.info("Will fetch %d blobs from server now." % now) + docs_we_want = docs_we_want[:now] + deferreds = [] + for i in xrange(now): + blob_id = docs_we_want.pop(0) + logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id)) + d = semaphore.run(with_retry, self._fetch, blob_id, namespace) + deferreds.append(d) + yield defer.gatherResults(deferreds, consumeErrors=True) @defer.inlineCallbacks def sync(self, namespace=''): -- cgit v1.2.3