From 4fde2537564ee298b967184bfdbe48cb963a8bd6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Sep 2016 04:12:10 -0300 Subject: [feature] revert sync download into straming (server) Instead of concurrent download, we are going to download a stream. This commit modifies server to support it. --- server/src/leap/soledad/server/sync.py | 45 +++++++++++++++++----------------- 1 file changed, 23 insertions(+), 22 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..a0324a27 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -21,6 +21,7 @@ from leap.soledad.common.l2db import sync, Document from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from itertools import izip MAX_REQUEST_SIZE = 200 # in Mb @@ -78,38 +79,38 @@ class SyncExchange(sync.SyncExchange): self._trace('after whats_changed') seen_ids = self._sync_state.seen_ids() # changed docs that weren't superseded by or converged with - changes_to_return = [ + self.changes_to_return = [ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes # there was a subsequent update if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] self._sync_state.put_changes_to_return( - new_gen, new_trans_id, changes_to_return) - number_of_changes = len(changes_to_return) - # query server for stored changes - _, _, next_change_to_return = \ - self._sync_state.next_change_to_return(received) + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) self.new_gen = new_gen self.new_trans_id = new_trans_id - # and append one change - self.change_to_return = next_change_to_return return self.new_gen, number_of_changes - def return_one_doc(self, return_doc_cb): - """ - Return one changed document and its last change generation to the - source syncing replica by invoking the callback return_doc_cb. + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. - This is called once for each document to be transferred from target to - source. + The final step of a sync exchange. - :param return_doc_cb: is a callback used to return the documents with - their last change generation to the target - replica. - :type return_doc_cb: callable(doc, gen, trans_id) + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None """ - if self.change_to_return is not None: - changed_doc_id, gen, trans_id = self.change_to_return - doc = self._db.get_doc(changed_doc_id, include_deleted=True) + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): @@ -264,7 +265,7 @@ class SyncResource(http_app.SyncResource): if self.replica_uid is not None: header['replica_uid'] = self.replica_uid self.responder.stream_entry(header) - self.sync_exch.return_one_doc(send_doc) + self.sync_exch.return_docs(send_doc) self.responder.end_stream() self.responder.finish_response() -- cgit v1.2.3