summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client/_db/blobs/sync.py')
-rw-r--r--src/leap/soledad/client/_db/blobs/sync.py106
1 files changed, 71 insertions, 35 deletions
diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py
index 3ee60305..e6397ede 100644
--- a/src/leap/soledad/client/_db/blobs/sync.py
+++ b/src/leap/soledad/client/_db/blobs/sync.py
@@ -17,12 +17,15 @@
"""
Synchronization between blobs client/server
"""
+from collections import defaultdict
from twisted.internet import defer
from twisted.internet import reactor
from twisted.logger import Logger
from twisted.internet import error
from .sql import SyncStatus
from .errors import RetriableTransferError
+
+
logger = Logger()
@@ -56,6 +59,9 @@ def with_retry(func, *args, **kwargs):
class BlobsSynchronizer(object):
+ def __init__(self):
+ self.locks = defaultdict(defer.DeferredLock)
+
@defer.inlineCallbacks
def refresh_sync_status_from_server(self, namespace=''):
d1 = self.remote_list(namespace=namespace)
@@ -82,7 +88,6 @@ class BlobsSynchronizer(object):
SyncStatus.SYNCED,
namespace=namespace)
- @defer.inlineCallbacks
def send_missing(self, namespace=''):
"""
Compare local and remote blobs and send what's missing in server.
@@ -90,30 +95,41 @@ class BlobsSynchronizer(object):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when all local blobs were sent to
+ server.
+ :rtype: twisted.internet.defer.Deferred
"""
- missing = yield self.local.list_status(
- SyncStatus.PENDING_UPLOAD, namespace)
- total = len(missing)
- logger.info("Will send %d blobs to server." % total)
- deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
-
- for i in xrange(total):
- blob_id = missing.pop()
- d = semaphore.run(
- with_retry, self.__send_one, blob_id, namespace, i, total)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ lock = self.locks['send_missing']
+ d = lock.run(self._send_missing, namespace)
+ return d
@defer.inlineCallbacks
- def __send_one(self, blob_id, namespace, i, total):
- logger.info("Sending blob to server (%d/%d): %s"
- % (i, total, blob_id))
- fd = yield self.local.get(blob_id, namespace=namespace)
- yield self._encrypt_and_upload(blob_id, fd)
- yield self.local.update_sync_status(blob_id, SyncStatus.SYNCED)
+ def _send_missing(self, namespace):
+ max_transfers = self.concurrent_transfers_limit
+ semaphore = defer.DeferredSemaphore(max_transfers)
+ # the list of blobs should be refreshed often, so we run as many
+ # concurrent transfers as we can and then refresh the list
+ while True:
+ d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace)
+ missing = yield d
+
+ if not missing:
+ break
+
+ total = len(missing)
+ now = min(total, max_transfers)
+ logger.info("There are %d pending blob uploads." % total)
+ logger.info("Will send %d blobs to server now." % now)
+ missing = missing[:now]
+ deferreds = []
+ for i in xrange(now):
+ blob_id = missing.pop(0)
+ d = semaphore.run(
+ with_retry, self._send, blob_id, namespace, i, total)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds, consumeErrors=True)
- @defer.inlineCallbacks
def fetch_missing(self, namespace=''):
"""
Compare local and remote blobs and fetch what's missing in local
@@ -122,21 +138,41 @@ class BlobsSynchronizer(object):
:param namespace:
Optional parameter to restrict operation to a given namespace.
:type namespace: str
+
+ :return: A deferred that fires when all remote blobs were received from
+ server.
+ :rtype: twisted.internet.defer.Deferred
"""
- # TODO: Use something to prioritize user requests over general new docs
- d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
- docs_we_want = yield d
- total = len(docs_we_want)
- logger.info("Will fetch %d blobs from server." % total)
- deferreds = []
- semaphore = defer.DeferredSemaphore(self.concurrent_transfers_limit)
-
- for i in xrange(len(docs_we_want)):
- blob_id = docs_we_want.pop()
- logger.info("Fetching blob (%d/%d): %s" % (i, total, blob_id))
- d = semaphore.run(with_retry, self.get, blob_id, namespace)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ lock = self.locks['fetch_missing']
+ d = lock.run(self._fetch_missing, namespace)
+ return d
+
+ @defer.inlineCallbacks
+ def _fetch_missing(self, namespace=''):
+ max_transfers = self.concurrent_transfers_limit
+ semaphore = defer.DeferredSemaphore(max_transfers)
+ # in order to make sure that transfer priorities will be met, the list
+ # of blobs to transfer should be refreshed often. What we do is run as
+ # many concurrent transfers as we can and then refresh the list
+ while True:
+ d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
+ docs_we_want = yield d
+
+ if not docs_we_want:
+ break
+
+ total = len(docs_we_want)
+ now = min(total, max_transfers)
+ logger.info("There are %d pending blob downloads." % total)
+ logger.info("Will fetch %d blobs from server now." % now)
+ docs_we_want = docs_we_want[:now]
+ deferreds = []
+ for i in xrange(now):
+ blob_id = docs_we_want.pop(0)
+ logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id))
+ d = semaphore.run(with_retry, self._fetch, blob_id, namespace)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def sync(self, namespace=''):