summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/leap/soledad/client/_db/blobs.py30
-rw-r--r--tests/server/test_blobs_server.py4
2 files changed, 19 insertions, 15 deletions
diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py
index 75f196c7..76746c71 100644
--- a/src/leap/soledad/client/_db/blobs.py
+++ b/src/leap/soledad/client/_db/blobs.py
@@ -275,13 +275,15 @@ class BlobManager(object):
return self.local.list(namespace, sync_status)
@defer.inlineCallbacks
- def refresh_sync_status_from_server(self):
- d1 = self.remote_list()
- d2 = self.local_list()
+ def refresh_sync_status_from_server(self, namespace=''):
+ d1 = self.remote_list(namespace=namespace)
+ d2 = self.local_list(namespace)
remote_list, local_list = yield defer.gatherResults([d1, d2])
pending_download_ids = tuple(set(remote_list) - set(local_list))
yield self.local.update_batch_sync_status(
- pending_download_ids, sync_status=SyncStatus.PENDING_DOWNLOAD)
+ pending_download_ids,
+ sync_status=SyncStatus.PENDING_DOWNLOAD,
+ namespace=namespace)
@defer.inlineCallbacks
def send_missing(self, namespace=''):
@@ -292,11 +294,9 @@ class BlobManager(object):
Optional parameter to restrict operation to a given namespace.
:type namespace: str
"""
- our_blobs = yield self.local_list(namespace)
- server_blobs = yield self.remote_list(namespace=namespace)
- missing = [b_id for b_id in our_blobs if b_id not in server_blobs]
+ d = self.local_list(namespace, SyncStatus.PENDING_UPLOAD)
+ missing = yield d
logger.info("Amount of documents missing on server: %s" % len(missing))
- # TODO: Send concurrently when we are able to stream directly from db
while missing:
deferreds = []
for _ in range(min(self.concurrency_limit, len(missing))):
@@ -330,18 +330,22 @@ class BlobManager(object):
:type namespace: str
"""
# TODO: Use something to prioritize user requests over general new docs
- our_blobs = yield self.local_list(namespace)
- server_blobs = yield self.remote_list(namespace=namespace)
- docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs]
+ d = self.local_list(namespace, SyncStatus.PENDING_DOWNLOAD)
+ docs_we_want = yield d
logger.info("Fetching new docs from server: %s" % len(docs_we_want))
- # TODO: Fetch concurrently when we are able to stream directly into db
while docs_we_want:
deferreds = []
for _ in range(min(self.concurrency_limit, len(docs_we_want))):
blob_id = docs_we_want.pop()
logger.info("Fetching new doc: %s" % blob_id)
deferreds.append(self.get(blob_id, namespace))
- yield defer.DeferredList(deferreds)
+ yield defer.gatherResults(deferreds)
+
+ @defer.inlineCallbacks
+ def sync(self, namespace=''):
+ yield self.refresh_sync_status_from_server(namespace)
+ yield self.fetch_missing(namespace)
+ yield self.send_missing(namespace)
@defer.inlineCallbacks
def put(self, doc, size, namespace=''):
diff --git a/tests/server/test_blobs_server.py b/tests/server/test_blobs_server.py
index c628ae78..7e8533fe 100644
--- a/tests/server/test_blobs_server.py
+++ b/tests/server/test_blobs_server.py
@@ -257,13 +257,13 @@ class BlobServerTestCase(unittest.TestCase):
@defer.inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
- def test_fetch_missing(self):
+ def test_sync_fetch_missing(self):
manager = BlobManager(self.tempdir, self.uri, self.secret,
self.secret, uuid4().hex)
self.addCleanup(manager.close)
blob_id = 'remote_only_blob_id'
yield manager._encrypt_and_upload(blob_id, BytesIO("X"))
- yield manager.fetch_missing()
+ yield manager.sync()
result = yield manager.local.get(blob_id)
self.assertIsNotNone(result)
self.assertEquals(result.getvalue(), "X")