diff options
Diffstat (limited to 'src/leap/soledad/client/_db/blobs.py')
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 75f196c7..76746c71 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -275,13 +275,15 @@ class BlobManager(object): return self.local.list(namespace, sync_status) @defer.inlineCallbacks - def refresh_sync_status_from_server(self): - d1 = self.remote_list() - d2 = self.local_list() + def refresh_sync_status_from_server(self, namespace=''): + d1 = self.remote_list(namespace=namespace) + d2 = self.local_list(namespace) remote_list, local_list = yield defer.gatherResults([d1, d2]) pending_download_ids = tuple(set(remote_list) - set(local_list)) yield self.local.update_batch_sync_status( - pending_download_ids, sync_status=SyncStatus.PENDING_DOWNLOAD) + pending_download_ids, + sync_status=SyncStatus.PENDING_DOWNLOAD, + namespace=namespace) @defer.inlineCallbacks def send_missing(self, namespace=''): @@ -292,11 +294,9 @@ class BlobManager(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ - our_blobs = yield self.local_list(namespace) - server_blobs = yield self.remote_list(namespace=namespace) - missing = [b_id for b_id in our_blobs if b_id not in server_blobs] + d = self.local_list(namespace, SyncStatus.PENDING_UPLOAD) + missing = yield d logger.info("Amount of documents missing on server: %s" % len(missing)) - # TODO: Send concurrently when we are able to stream directly from db while missing: deferreds = [] for _ in range(min(self.concurrency_limit, len(missing))): @@ -330,18 +330,22 @@ class BlobManager(object): :type namespace: str """ # TODO: Use something to prioritize user requests over general new docs - our_blobs = yield self.local_list(namespace) - server_blobs = yield self.remote_list(namespace=namespace) - docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] + d = self.local_list(namespace, SyncStatus.PENDING_DOWNLOAD) + docs_we_want = yield d logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - # TODO: Fetch concurrently when we are able to stream directly into db while docs_we_want: deferreds = [] for _ in range(min(self.concurrency_limit, len(docs_we_want))): blob_id = docs_we_want.pop() logger.info("Fetching new doc: %s" % blob_id) deferreds.append(self.get(blob_id, namespace)) - yield defer.DeferredList(deferreds) + yield defer.gatherResults(deferreds) + + @defer.inlineCallbacks + def sync(self, namespace=''): + yield self.refresh_sync_status_from_server(namespace) + yield self.fetch_missing(namespace) + yield self.send_missing(namespace) @defer.inlineCallbacks def put(self, doc, size, namespace=''): |