diff options
-rw-r--r-- | src/leap/soledad/client/_db/blobs.py | 30 | ||||
-rw-r--r-- | tests/server/test_blobs_server.py | 4 |
2 files changed, 19 insertions, 15 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py index 75f196c7..76746c71 100644 --- a/src/leap/soledad/client/_db/blobs.py +++ b/src/leap/soledad/client/_db/blobs.py @@ -275,13 +275,15 @@ class BlobManager(object): return self.local.list(namespace, sync_status) @defer.inlineCallbacks - def refresh_sync_status_from_server(self): - d1 = self.remote_list() - d2 = self.local_list() + def refresh_sync_status_from_server(self, namespace=''): + d1 = self.remote_list(namespace=namespace) + d2 = self.local_list(namespace) remote_list, local_list = yield defer.gatherResults([d1, d2]) pending_download_ids = tuple(set(remote_list) - set(local_list)) yield self.local.update_batch_sync_status( - pending_download_ids, sync_status=SyncStatus.PENDING_DOWNLOAD) + pending_download_ids, + sync_status=SyncStatus.PENDING_DOWNLOAD, + namespace=namespace) @defer.inlineCallbacks def send_missing(self, namespace=''): @@ -292,11 +294,9 @@ class BlobManager(object): Optional parameter to restrict operation to a given namespace. :type namespace: str """ - our_blobs = yield self.local_list(namespace) - server_blobs = yield self.remote_list(namespace=namespace) - missing = [b_id for b_id in our_blobs if b_id not in server_blobs] + d = self.local_list(namespace, SyncStatus.PENDING_UPLOAD) + missing = yield d logger.info("Amount of documents missing on server: %s" % len(missing)) - # TODO: Send concurrently when we are able to stream directly from db while missing: deferreds = [] for _ in range(min(self.concurrency_limit, len(missing))): @@ -330,18 +330,22 @@ class BlobManager(object): :type namespace: str """ # TODO: Use something to prioritize user requests over general new docs - our_blobs = yield self.local_list(namespace) - server_blobs = yield self.remote_list(namespace=namespace) - docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] + d = self.local_list(namespace, SyncStatus.PENDING_DOWNLOAD) + docs_we_want = yield d logger.info("Fetching new docs from server: %s" % len(docs_we_want)) - # TODO: Fetch concurrently when we are able to stream directly into db while docs_we_want: deferreds = [] for _ in range(min(self.concurrency_limit, len(docs_we_want))): blob_id = docs_we_want.pop() logger.info("Fetching new doc: %s" % blob_id) deferreds.append(self.get(blob_id, namespace)) - yield defer.DeferredList(deferreds) + yield defer.gatherResults(deferreds) + + @defer.inlineCallbacks + def sync(self, namespace=''): + yield self.refresh_sync_status_from_server(namespace) + yield self.fetch_missing(namespace) + yield self.send_missing(namespace) @defer.inlineCallbacks def put(self, doc, size, namespace=''): diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py index c628ae78..7e8533fe 100644 --- a/tests/server/test_blobs_server.py +++ b/tests/server/test_blobs_server.py @@ -257,13 +257,13 @@ class BlobServerTestCase(unittest.TestCase): @defer.inlineCallbacks @pytest.mark.usefixtures("method_tmpdir") - def test_fetch_missing(self): + def test_sync_fetch_missing(self): manager = BlobManager(self.tempdir, self.uri, self.secret, self.secret, uuid4().hex) self.addCleanup(manager.close) blob_id = 'remote_only_blob_id' yield manager._encrypt_and_upload(blob_id, BytesIO("X")) - yield manager.fetch_missing() + yield manager.sync() result = yield manager.local.get(blob_id) self.assertIsNotNone(result) self.assertEquals(result.getvalue(), "X") |