summaryrefslogtreecommitdiff
path: root/server/src/leap/soledad
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-09-16 04:12:10 -0300
committerdrebs <drebs@leap.se>2016-12-12 09:11:58 -0200
commit4fde2537564ee298b967184bfdbe48cb963a8bd6 (patch)
treefd263dd67fcb2ae50dc1cffe53e3af0a668659ee /server/src/leap/soledad
parent5b2e52200eb815b598e7ed74bb7b4e21387d9c08 (diff)
[feature] revert sync download into straming (server)
Instead of concurrent download, we are going to download a stream. This commit modifies server to support it.
Diffstat (limited to 'server/src/leap/soledad')
-rw-r--r--server/src/leap/soledad/server/sync.py45
1 files changed, 23 insertions, 22 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 3f5c4aba..a0324a27 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -21,6 +21,7 @@ from leap.soledad.common.l2db import sync, Document
from leap.soledad.common.l2db.remote import http_app
from leap.soledad.server.caching import get_cache_for
from leap.soledad.server.state import ServerSyncState
+from itertools import izip
MAX_REQUEST_SIZE = 200 # in Mb
@@ -78,38 +79,38 @@ class SyncExchange(sync.SyncExchange):
self._trace('after whats_changed')
seen_ids = self._sync_state.seen_ids()
# changed docs that weren't superseded by or converged with
- changes_to_return = [
+ self.changes_to_return = [
(doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
# there was a subsequent update
if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
self._sync_state.put_changes_to_return(
- new_gen, new_trans_id, changes_to_return)
- number_of_changes = len(changes_to_return)
- # query server for stored changes
- _, _, next_change_to_return = \
- self._sync_state.next_change_to_return(received)
+ new_gen, new_trans_id, self.changes_to_return)
+ number_of_changes = len(self.changes_to_return)
self.new_gen = new_gen
self.new_trans_id = new_trans_id
- # and append one change
- self.change_to_return = next_change_to_return
return self.new_gen, number_of_changes
- def return_one_doc(self, return_doc_cb):
- """
- Return one changed document and its last change generation to the
- source syncing replica by invoking the callback return_doc_cb.
+ def return_docs(self, return_doc_cb):
+ """Return the changed documents and their last change generation
+ repeatedly invoking the callback return_doc_cb.
- This is called once for each document to be transferred from target to
- source.
+ The final step of a sync exchange.
- :param return_doc_cb: is a callback used to return the documents with
- their last change generation to the target
- replica.
- :type return_doc_cb: callable(doc, gen, trans_id)
+ :param: return_doc_cb(doc, gen, trans_id): is a callback
+ used to return the documents with their last change generation
+ to the target replica.
+ :return: None
"""
- if self.change_to_return is not None:
- changed_doc_id, gen, trans_id = self.change_to_return
- doc = self._db.get_doc(changed_doc_id, include_deleted=True)
+ changes_to_return = self.changes_to_return
+ # return docs, including conflicts
+ changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
+ docs = self._db.get_docs(
+ changed_doc_ids, check_for_conflicts=False, include_deleted=True)
+
+ docs_by_gen = izip(
+ docs, (gen for _, gen, _ in changes_to_return),
+ (trans_id for _, _, trans_id in changes_to_return))
+ for doc, gen, trans_id in docs_by_gen:
return_doc_cb(doc, gen, trans_id)
def batched_insert_from_source(self, entries, sync_id):
@@ -264,7 +265,7 @@ class SyncResource(http_app.SyncResource):
if self.replica_uid is not None:
header['replica_uid'] = self.replica_uid
self.responder.stream_entry(header)
- self.sync_exch.return_one_doc(send_doc)
+ self.sync_exch.return_docs(send_doc)
self.responder.end_stream()
self.responder.finish_response()