diff options
author | Victor Shyba <victor.shyba@gmail.com> | 2016-09-16 04:12:10 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2016-12-12 09:11:58 -0200 |
commit | 4fde2537564ee298b967184bfdbe48cb963a8bd6 (patch) | |
tree | fd263dd67fcb2ae50dc1cffe53e3af0a668659ee /server/src/leap | |
parent | 5b2e52200eb815b598e7ed74bb7b4e21387d9c08 (diff) |
[feature] revert sync download into straming (server)
Instead of concurrent download, we are going to download a stream. This
commit modifies server to support it.
Diffstat (limited to 'server/src/leap')
-rw-r--r-- | server/src/leap/soledad/server/sync.py | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..a0324a27 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -21,6 +21,7 @@ from leap.soledad.common.l2db import sync, Document from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from itertools import izip MAX_REQUEST_SIZE = 200 # in Mb @@ -78,38 +79,38 @@ class SyncExchange(sync.SyncExchange): self._trace('after whats_changed') seen_ids = self._sync_state.seen_ids() # changed docs that weren't superseded by or converged with - changes_to_return = [ + self.changes_to_return = [ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes # there was a subsequent update if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] self._sync_state.put_changes_to_return( - new_gen, new_trans_id, changes_to_return) - number_of_changes = len(changes_to_return) - # query server for stored changes - _, _, next_change_to_return = \ - self._sync_state.next_change_to_return(received) + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) self.new_gen = new_gen self.new_trans_id = new_trans_id - # and append one change - self.change_to_return = next_change_to_return return self.new_gen, number_of_changes - def return_one_doc(self, return_doc_cb): - """ - Return one changed document and its last change generation to the - source syncing replica by invoking the callback return_doc_cb. + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. - This is called once for each document to be transferred from target to - source. + The final step of a sync exchange. - :param return_doc_cb: is a callback used to return the documents with - their last change generation to the target - replica. - :type return_doc_cb: callable(doc, gen, trans_id) + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None """ - if self.change_to_return is not None: - changed_doc_id, gen, trans_id = self.change_to_return - doc = self._db.get_doc(changed_doc_id, include_deleted=True) + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): @@ -264,7 +265,7 @@ class SyncResource(http_app.SyncResource): if self.replica_uid is not None: header['replica_uid'] = self.replica_uid self.responder.stream_entry(header) - self.sync_exch.return_one_doc(send_doc) + self.sync_exch.return_docs(send_doc) self.responder.end_stream() self.responder.finish_response() |