diff options
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 45 | 
1 files changed, 23 insertions, 22 deletions
| diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..a0324a27 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -21,6 +21,7 @@ from leap.soledad.common.l2db import sync, Document  from leap.soledad.common.l2db.remote import http_app  from leap.soledad.server.caching import get_cache_for  from leap.soledad.server.state import ServerSyncState +from itertools import izip  MAX_REQUEST_SIZE = 200  # in Mb @@ -78,38 +79,38 @@ class SyncExchange(sync.SyncExchange):              self._trace('after whats_changed')              seen_ids = self._sync_state.seen_ids()              # changed docs that weren't superseded by or converged with -            changes_to_return = [ +            self.changes_to_return = [                  (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes                  # there was a subsequent update                  if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]              self._sync_state.put_changes_to_return( -                new_gen, new_trans_id, changes_to_return) -            number_of_changes = len(changes_to_return) -        # query server for stored changes -        _, _, next_change_to_return = \ -            self._sync_state.next_change_to_return(received) +                new_gen, new_trans_id, self.changes_to_return) +            number_of_changes = len(self.changes_to_return)          self.new_gen = new_gen          self.new_trans_id = new_trans_id -        # and append one change -        self.change_to_return = next_change_to_return          return self.new_gen, number_of_changes -    def return_one_doc(self, return_doc_cb): -        """ -        Return one changed document and its last change generation to the -        source syncing replica by invoking the callback return_doc_cb. +    def return_docs(self, return_doc_cb): +        """Return the changed documents and their last change generation +        repeatedly invoking the callback return_doc_cb. -        This is called once for each document to be transferred from target to -        source. +        The final step of a sync exchange. -        :param return_doc_cb: is a callback used to return the documents with -                              their last change generation to the target -                              replica. -        :type return_doc_cb: callable(doc, gen, trans_id) +        :param: return_doc_cb(doc, gen, trans_id): is a callback +                used to return the documents with their last change generation +                to the target replica. +        :return: None          """ -        if self.change_to_return is not None: -            changed_doc_id, gen, trans_id = self.change_to_return -            doc = self._db.get_doc(changed_doc_id, include_deleted=True) +        changes_to_return = self.changes_to_return +        # return docs, including conflicts +        changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] +        docs = self._db.get_docs( +            changed_doc_ids, check_for_conflicts=False, include_deleted=True) + +        docs_by_gen = izip( +            docs, (gen for _, gen, _ in changes_to_return), +            (trans_id for _, _, trans_id in changes_to_return)) +        for doc, gen, trans_id in docs_by_gen:              return_doc_cb(doc, gen, trans_id)      def batched_insert_from_source(self, entries, sync_id): @@ -264,7 +265,7 @@ class SyncResource(http_app.SyncResource):          if self.replica_uid is not None:              header['replica_uid'] = self.replica_uid          self.responder.stream_entry(header) -        self.sync_exch.return_one_doc(send_doc) +        self.sync_exch.return_docs(send_doc)          self.responder.end_stream()          self.responder.finish_response() | 
