diff options
| author | drebs <drebs@leap.se> | 2014-07-07 11:34:47 -0300 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2014-07-08 11:52:55 -0300 | 
| commit | 1e69bf4aceb2502a17dff98581acc7abcf41e168 (patch) | |
| tree | 7abae208f15faafe57616a26489419e52a56d92f | |
| parent | ba109986d55e008c7855d20538d84f2c69ca9271 (diff) | |
Update target sync with sequential info (#5869).
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 8 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 24 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/syncs/updates/put.js | 47 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 13 | 
4 files changed, 66 insertions, 26 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index c459925d..7e563823 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -652,7 +652,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):          return self._response()      def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, -            id, rev, content, gen, trans_id, number_of_docs): +            id, rev, content, gen, trans_id, number_of_docs, doc_idx):          """          Put a sync document on server by means of a POST request. @@ -676,6 +676,8 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int          :return: The body and headers of the response.          :rtype: tuple @@ -694,7 +696,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):          size += self._prepare(              ',', entries,              id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, -            number_of_docs=number_of_docs) +            number_of_docs=number_of_docs, doc_idx=doc_idx)          entries.append('\r\n]')          size += len(entries[-1])          # send headers @@ -1189,7 +1191,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              t.doc_syncer.set_request_method(                  'put', sync_id, cur_target_gen, cur_target_trans_id,                  id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, -                trans_id=trans_id, number_of_docs=number_of_docs) +                trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1)              # set the success calback              def _success_callback(idx, total, response): diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0adfc70..5658f4ce 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1107,7 +1107,8 @@ class CouchDatabase(CommonBackend):      def _set_replica_gen_and_trans_id(self, other_replica_uid,                                        other_generation, other_transaction_id, -                                      number_of_docs=None, sync_id=None): +                                      number_of_docs=None, doc_idx=None, +                                      sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1126,16 +1127,18 @@ class CouchDatabase(CommonBackend):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int          :param sync_id: The id of the current sync session.          :type sync_id: str          """          self._do_set_replica_gen_and_trans_id(              other_replica_uid, other_generation, other_transaction_id, -            number_of_docs=number_of_docs, sync_id=sync_id) +            number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)      def _do_set_replica_gen_and_trans_id(              self, other_replica_uid, other_generation, other_transaction_id, -            number_of_docs=None, sync_id=None): +            number_of_docs=None, doc_idx=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1154,6 +1157,8 @@ class CouchDatabase(CommonBackend):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int          :param sync_id: The id of the current sync session.          :type sync_id: str @@ -1181,10 +1186,12 @@ class CouchDatabase(CommonBackend):                      'other_generation': other_generation,                      'other_transaction_id': other_transaction_id,                  } -                if sync_id is not None: -                    body['sync_id'] = sync_id                  if number_of_docs is not None:                      body['number_of_docs'] = number_of_docs +                if doc_idx is not None: +                    body['doc_idx'] = doc_idx +                if sync_id is not None: +                    body['sync_id'] = sync_id                  res.put_json(                      body=body,                      headers={'content-type': 'application/json'}) @@ -1325,7 +1332,7 @@ class CouchDatabase(CommonBackend):      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,                            replica_trans_id='', number_of_docs=None, -                          sync_id=None): +                          doc_idx=None, sync_id=None):          """          Insert/update document into the database with a given revision. @@ -1361,6 +1368,8 @@ class CouchDatabase(CommonBackend):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int          :param sync_id: The id of the current sync session.          :type sync_id: str @@ -1423,7 +1432,8 @@ class CouchDatabase(CommonBackend):          if replica_uid is not None and replica_gen is not None:              self._set_replica_gen_and_trans_id(                  replica_uid, replica_gen, replica_trans_id, -                number_of_docs=number_of_docs, sync_id=sync_id) +                number_of_docs=number_of_docs, doc_idx=doc_idx, +                sync_id=sync_id)          # update info          old_doc.rev = doc.rev          if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index d754faaa..b0ae2de6 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -53,8 +53,13 @@ function(doc, req){      var other_transaction_id = body['other_transaction_id']      var sync_id = body['sync_id'];      var number_of_docs = body['number_of_docs']; +    var doc_idx = body['doc_idx']; + +    // parse integers      if (number_of_docs != null)          number_of_docs = parseInt(number_of_docs); +    if (doc_idx != null) +        doc_idx = parseInt(doc_idx);      if (other_replica_uid == null              || other_generation == null @@ -69,7 +74,7 @@ function(doc, req){      var current_gen = other_generation;      var current_trans_id = other_transaction_id; -    /*------------ Wait for end of sync session before storing ------------*/ +    /*------------- Wait for sequential values before storing -------------*/      // we just try to obtain pending log if we received a sync_id      if (sync_id != null) { @@ -80,31 +85,49 @@ function(doc, req){              doc['pending'][other_replica_uid] = {                  'sync_id': sync_id,                  'log': [], +                'last_doc_idx': 0,              }          }          // append incoming data to pending log          doc['pending'][other_replica_uid]['log'].push([              other_generation, -            other_transaction_id +            other_transaction_id, +            doc_idx,          ]) -        // leave the sync log untouched if we still did not receive all docs -        if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) -            return [doc, 'ok']; - -        // otherwise, sort pending log according to generation +        // sort pending log according to generation          doc['pending'][other_replica_uid]['log'].sort(function(a, b) {              return a[0] - b[0];          });          // get most up-to-date information from pending log -        pending = doc['pending'][other_replica_uid]['log'].pop() -        current_gen = pending[0]; -        current_trans_id = pending[1]; +        var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; +        var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + +        current_gen = null; +        current_trans_id = null; + +        while (last_doc_idx + 1 == pending_idx) { +            pending = doc['pending'][other_replica_uid]['log'].shift() +            current_gen = pending[0]; +            current_trans_id = pending[1]; +            last_doc_idx = pending[2] +            if (doc['pending'][other_replica_uid]['log'].length == 0) +                break; +            pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; +        } + +        // leave the sync log untouched if we still did not receive enough docs +        if (current_gen == null) +            return [doc, 'ok']; + +        // update last index of received doc +        doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; -        // and remove all pending data from that replica -        delete doc['pending'][other_replica_uid] +        // eventually remove all pending data from that replica +        if (last_doc_idx == number_of_docs) +            delete doc['pending'][other_replica_uid]      }      /*--------------- Store source replica info on sync log ---------------*/ diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3a1881fc..6dc99b5a 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -287,7 +287,7 @@ class SyncExchange(sync.SyncExchange):              return_doc_cb(doc, gen, trans_id)      def insert_doc_from_source(self, doc, source_gen, trans_id, -            number_of_docs=None, sync_id=None): +            number_of_docs=None, doc_idx=None, sync_id=None):          """Try to insert synced document from source.          Conflicting documents are not inserted but will be sent over @@ -308,13 +308,15 @@ class SyncExchange(sync.SyncExchange):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document. +        :type doc_idx: int          :param sync_id: The id of the current sync session.          :type sync_id: str          """          state, at_gen = self._db._put_doc_if_newer(              doc, save_conflict=False, replica_uid=self.source_replica_uid,              replica_gen=source_gen, replica_trans_id=trans_id, -            number_of_docs=number_of_docs, sync_id=sync_id) +            number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)          if state == 'inserted':              self._sync_state.put_seen_id(doc.doc_id, at_gen)          elif state == 'converged': @@ -369,7 +371,8 @@ class SyncResource(http_app.SyncResource):          self._sync_id = sync_id      @http_app.http_method(content_as_args=True) -    def post_put(self, id, rev, content, gen, trans_id, number_of_docs): +    def post_put(self, id, rev, content, gen, trans_id, number_of_docs, +            doc_idx):          """          Put one incoming document into the server replica. @@ -388,11 +391,13 @@ class SyncResource(http_app.SyncResource):          :param number_of_docs: The total amount of documents sent on this sync                                 session.          :type number_of_docs: int +        :param doc_idx: The index of the current document. +        :type doc_idx: int          """          doc = Document(id, rev, content)          self.sync_exch.insert_doc_from_source(              doc, gen, trans_id, number_of_docs=number_of_docs, -            sync_id=self._sync_id) +            doc_idx=doc_idx, sync_id=self._sync_id)      @http_app.http_method(received=int, content_as_args=True)      def post_get(self, received):  | 
