diff options
Diffstat (limited to 'common/src')
| -rw-r--r-- | common/src/leap/soledad/common/couch.py | 45 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/ddocs/syncs/updates/put.js | 118 | 
2 files changed, 147 insertions, 16 deletions
| diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..c0adfc70 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend):          )      def _set_replica_gen_and_trans_id(self, other_replica_uid, -                                      other_generation, other_transaction_id): +                                      other_generation, other_transaction_id, +                                      number_of_docs=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the              generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          self._do_set_replica_gen_and_trans_id( -            other_replica_uid, other_generation, other_transaction_id) +            other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=number_of_docs, sync_id=sync_id)      def _do_set_replica_gen_and_trans_id( -            self, other_replica_uid, other_generation, other_transaction_id): +            self, other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the                                       generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :raise MissingDesignDocError: Raised when tried to access a missing                                        design document. @@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend):          res = self._database.resource(*ddoc_path)          try:              with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                body={ +                    'other_replica_uid': other_replica_uid, +                    'other_generation': other_generation, +                    'other_transaction_id': other_transaction_id, +                } +                if sync_id is not None: +                    body['sync_id'] = sync_id +                if number_of_docs is not None: +                    body['number_of_docs'] = number_of_docs                  res.put_json( -                    body={ -                        'other_replica_uid': other_replica_uid, -                        'other_generation': other_generation, -                        'other_transaction_id': other_transaction_id, -                    }, +                    body=body,                      headers={'content-type': 'application/json'})          except ResourceNotFound as e:              raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend):              doc.set_conflicts(cur_doc.get_conflicts())      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, -                          replica_trans_id=''): +                          replica_trans_id='', number_of_docs=None, +                          sync_id=None):          """          Insert/update document into the database with a given revision. @@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend):          :param replica_trans_id: The transaction_id associated with the                                   generation.          :type replica_trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :return: (state, at_gen) -  If we don't have doc_id already, or if                   doc_rev supersedes the existing document revision, then the @@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend):                  self._force_doc_sync_conflict(doc)          if replica_uid is not None and replica_gen is not None:              self._set_replica_gen_and_trans_id( -                replica_uid, replica_gen, replica_trans_id) +                replica_uid, replica_gen, replica_trans_id, +                number_of_docs=number_of_docs, sync_id=sync_id)          # update info          old_doc.rev = doc.rev          if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..d754faaa 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,128 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + *     { + *         'syncs': [ + *             ['<replica_uid>', <gen>, '<trans_id>'], + *             ...  + *         ], + *         'pending': { + *             'other_replica_uid': { + *                 'sync_id': '<sync_id>', + *                 'log': [[<gen>, '<trans_id>'], ...] + *             }, + *             ... + *         } + *     } + * + * The update function below does the following: + * + *   0. If we do not receive a sync_id, we just update the 'syncs' list with + *      the incoming info about the source replica state. + * + *   1. Otherwise, if the incoming sync_id differs from current stored + *      sync_id, then we assume that the previous sync session for that source + *      replica was interrupted and discard all pending data. + * + *   2. Then we append incoming info as pending data for that source replica + *      and current sync_id, and sort the pending data by generation. + * + *   3. Then we go through pending data and find the most recent generation + *      that we can use to update the actual sync log. + * + *   4. Finally, we insert the most up to date information into the sync log. + */  function(doc, req){ + +    // create the document if it doesn't exist      if (!doc) {          doc = {}          doc['_id'] = 'u1db_sync_log';          doc['syncs'] = [];      } -    body = JSON.parse(req.body); + +    // get and validate incoming info +    var body = JSON.parse(req.body); +    var other_replica_uid = body['other_replica_uid']; +    var other_generation = parseInt(body['other_generation']); +    var other_transaction_id = body['other_transaction_id'] +    var sync_id = body['sync_id']; +    var number_of_docs = body['number_of_docs']; +    if (number_of_docs != null) +        number_of_docs = parseInt(number_of_docs); + +    if (other_replica_uid == null +            || other_generation == null +            || other_transaction_id == null) +        return [null, 'invalid data']; + +    // create slot for pending logs +    if (doc['pending'] == null) +        doc['pending'] = {}; + +    // these are the values that will be actually inserted +    var current_gen = other_generation; +    var current_trans_id = other_transaction_id; + +    /*------------ Wait for end of sync session before storing ------------*/ + +    // we just try to obtain pending log if we received a sync_id +    if (sync_id != null) { + +        // create slot for current source and sync_id pending log +        if (doc['pending'][other_replica_uid] == null +                || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { +            doc['pending'][other_replica_uid] = { +                'sync_id': sync_id, +                'log': [], +            } +        } + +        // append incoming data to pending log +        doc['pending'][other_replica_uid]['log'].push([ +            other_generation, +            other_transaction_id +        ]) + +        // leave the sync log untouched if we still did not receive all docs +        if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) +            return [doc, 'ok']; + +        // otherwise, sort pending log according to generation +        doc['pending'][other_replica_uid]['log'].sort(function(a, b) { +            return a[0] - b[0]; +        }); + +        // get most up-to-date information from pending log +        pending = doc['pending'][other_replica_uid]['log'].pop() +        current_gen = pending[0]; +        current_trans_id = pending[1]; + +        // and remove all pending data from that replica +        delete doc['pending'][other_replica_uid] +    } + +    /*--------------- Store source replica info on sync log ---------------*/ +      // remove outdated info      doc['syncs'] = doc['syncs'].filter(          function (entry) { -            return entry[0] != body['other_replica_uid']; +            return entry[0] != other_replica_uid;          }      ); -    // store u1db rev + +    // store in log      doc['syncs'].push([ -        body['other_replica_uid'], -        body['other_generation'], -        body['other_transaction_id'] +        other_replica_uid, +        current_gen, +        current_trans_id       ]); +      return [doc, 'ok'];  } | 
