summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad/client')
-rw-r--r--src/leap/soledad/client/_db/blobs.py30
1 files changed, 17 insertions, 13 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 75f196c7..76746c71 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -275,13 +275,15 @@ class BlobManager(object):
return self.local.list(namespace, sync_status)
@defer.inlineCallbacks
- def refresh_sync_status_from_server(self):
- d1 = self.remote_list()
- d2 = self.local_list()
+ def refresh_sync_status_from_server(self, namespace=''):
+ d1 = self.remote_list(namespace=namespace)
+ d2 = self.local_list(namespace)
remote_list, local_list = yield defer.gatherResults([d1, d2])
pending_download_ids = tuple(set(remote_list) - set(local_list))
yield self.local.update_batch_sync_status(
- pending_download_ids, sync_status=SyncStatus.PENDING_DOWNLOAD)
+ pending_download_ids,
+ sync_status=SyncStatus.PENDING_DOWNLOAD,
+ namespace=namespace)
@defer.inlineCallbacks
def send_missing(self, namespace=''):
@@ -292,11 +294,9 @@ class BlobManager(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
- our_blobs = yield self.local_list(namespace)
- server_blobs = yield self.remote_list(namespace=namespace)
- missing = [b_id for b_id in our_blobs if b_id not in server_blobs]
+ d = self.local_list(namespace, SyncStatus.PENDING_UPLOAD)
+ missing = yield d
logger.info("Amount of documents missing on server: %s" % len(missing))
- # TODO: Send concurrently when we are able to stream directly from db
while missing:
deferreds = []
for _ in range(min(self.concurrency_limit, len(missing))):
@@ -330,18 +330,22 @@ class BlobManager(object):
:type namespace: str
"""
# TODO: Use something to prioritize user requests over general new docs
- our_blobs = yield self.local_list(namespace)
- server_blobs = yield self.remote_list(namespace=namespace)
- docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]
+ d = self.local_list(namespace, SyncStatus.PENDING_DOWNLOAD)
+ docs_we_want = yield d
logger.info("Fetching new docs from server: %s" % len(docs_we_want))
- # TODO: Fetch concurrently when we are able to stream directly into db
while docs_we_want:
deferreds = []
for _ in range(min(self.concurrency_limit, len(docs_we_want))):
blob_id = docs_we_want.pop()
logger.info("Fetching new doc: %s" % blob_id)
deferreds.append(self.get(blob_id, namespace))
- yield defer.DeferredList(deferreds)
+ yield defer.gatherResults(deferreds)
+
+ @defer.inlineCallbacks
+ def sync(self, namespace=''):
+ yield self.refresh_sync_status_from_server(namespace)
+ yield self.fetch_missing(namespace)
+ yield self.send_missing(namespace)
@defer.inlineCallbacks
def put(self, doc, size, namespace=''):