summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/_db/blobs
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-12-05 11:48:20 -0200
committerdrebs <drebs@leap.se>2017-12-05 11:48:25 -0200
commitd7894eddd3a1c5241c4a79f58c227a7d30610b9a (patch)
treec8282034d62482bd58eecd783f9a17baf0382747 /src/leap/soledad/client/_db/blobs
parent7f2eec3df3ee5e0eecdb25f064353992d1ec2436 (diff)
[bug] transfer maximum allowed number of blobs
-- Closes: #9004
Diffstat (limited to 'src/leap/soledad/client/_db/blobs')
-rw-r--r--src/leap/soledad/client/_db/blobs/__init__.py10
-rw-r--r--src/leap/soledad/client/_db/blobs/sync.py81
2 files changed, 48 insertions, 43 deletions
diff --git a/src/leap/soledad/client/_db/blobs/__init__.py b/src/leap/soledad/client/_db/blobs/__init__.py
index 86e45381..95814d46 100644
--- a/src/leap/soledad/client/_db/blobs/__init__.py
+++ b/src/leap/soledad/client/_db/blobs/__init__.py
@@ -309,17 +309,15 @@ class BlobManager(BlobsSynchronizer):
yield self.local.update_sync_status(
doc.blob_id, SyncStatus.PENDING_UPLOAD, namespace=namespace,
priority=priority)
- yield self._send(doc.blob_id, namespace, 1, 1)
+ yield self._send(doc.blob_id, namespace)
- def _send(self, blob_id, namespace, i, total):
+ def _send(self, blob_id, namespace):
lock = self.locks[blob_id]
- d = lock.run(self.__send, blob_id, namespace, i, total)
+ d = lock.run(self.__send, blob_id, namespace)
return d
@defer.inlineCallbacks
- def __send(self, blob_id, namespace, i, total):
- logger.info("Sending blob to server (%d/%d): %s"
- % (i, total, blob_id))
+ def __send(self, blob_id, namespace):
# In fact, some kind of pipe is needed here, where each write on db
# handle gets forwarded into a write on the connection handle
fd = yield self.local.get(blob_id, namespace=namespace)
diff --git a/src/leap/soledad/client/_db/blobs/sync.py b/src/leap/soledad/client/_db/blobs/sync.py
index e6397ede..9309d9da 100644
--- a/src/leap/soledad/client/_db/blobs/sync.py
+++ b/src/leap/soledad/client/_db/blobs/sync.py
@@ -106,29 +106,33 @@ class BlobsSynchronizer(object):
@defer.inlineCallbacks
def _send_missing(self, namespace):
+ # the list of priorities must be refreshed every time a new blob will
+ # be transferred. To do that, we use a semaphore and get a new ordered
+ # list only when there are free slots for new transfers.
max_transfers = self.concurrent_transfers_limit
semaphore = defer.DeferredSemaphore(max_transfers)
- # the list of blobs should be refreshed often, so we run as many
- # concurrent transfers as we can and then refresh the list
+ scheduled = set()
while True:
- d = self.local_list_status(SyncStatus.PENDING_UPLOAD, namespace)
- missing = yield d
-
- if not missing:
+ d = semaphore.run(self._send_next, namespace, scheduled)
+ success = yield d
+ if not success:
break
- total = len(missing)
- now = min(total, max_transfers)
- logger.info("There are %d pending blob uploads." % total)
- logger.info("Will send %d blobs to server now." % now)
- missing = missing[:now]
- deferreds = []
- for i in xrange(now):
- blob_id = missing.pop(0)
- d = semaphore.run(
- with_retry, self._send, blob_id, namespace, i, total)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ @defer.inlineCallbacks
+ def _send_next(self, namespace, scheduled):
+ status = SyncStatus.PENDING_UPLOAD
+ pending = yield self.local_list_status(status, namespace)
+ pending = [x for x in pending if x not in scheduled]
+ logger.info("There are %d pending blob uploads." % len(pending))
+
+ if not pending:
+ # we are finished, indicate that to our caller
+ defer.returnValue(False)
+
+ blob_id = pending[0]
+ logger.info("Sending blob: %s" % (blob_id,))
+ yield with_retry(self._send, blob_id, namespace)
+ defer.returnValue(True)
def fetch_missing(self, namespace=''):
"""
@@ -149,30 +153,33 @@ class BlobsSynchronizer(object):
@defer.inlineCallbacks
def _fetch_missing(self, namespace=''):
+ # the list of priorities must be refreshed every time a new blob will
+ # be transferred. To do that, we use a semaphore and get a new ordered
+ # list only when there are free slots for new transfers.
max_transfers = self.concurrent_transfers_limit
semaphore = defer.DeferredSemaphore(max_transfers)
- # in order to make sure that transfer priorities will be met, the list
- # of blobs to transfer should be refreshed often. What we do is run as
- # many concurrent transfers as we can and then refresh the list
+ scheduled = set()
while True:
- d = self.local_list_status(SyncStatus.PENDING_DOWNLOAD, namespace)
- docs_we_want = yield d
-
- if not docs_we_want:
+ d = semaphore.run(self._fetch_next, namespace, scheduled)
+ success = yield d
+ if not success:
break
- total = len(docs_we_want)
- now = min(total, max_transfers)
- logger.info("There are %d pending blob downloads." % total)
- logger.info("Will fetch %d blobs from server now." % now)
- docs_we_want = docs_we_want[:now]
- deferreds = []
- for i in xrange(now):
- blob_id = docs_we_want.pop(0)
- logger.info("Fetching blob (%d/%d): %s" % (i, now, blob_id))
- d = semaphore.run(with_retry, self._fetch, blob_id, namespace)
- deferreds.append(d)
- yield defer.gatherResults(deferreds, consumeErrors=True)
+ @defer.inlineCallbacks
+ def _fetch_next(self, namespace, scheduled):
+ status = SyncStatus.PENDING_DOWNLOAD
+ pending = yield self.local_list_status(status, namespace)
+ pending = [x for x in pending if x not in scheduled]
+ logger.info("There are %d pending blob downloads." % len(pending))
+
+ if not pending:
+ # we are finished, indicate that to our caller
+ defer.returnValue(False)
+
+ blob_id = pending[0]
+ logger.info("Fetching blob: %s" % (blob_id,))
+ yield with_retry(self._fetch, blob_id, namespace)
+ defer.returnValue(True)
@defer.inlineCallbacks
def sync(self, namespace=''):