diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/src/leap/soledad/common/couch.py | 45 | ||||
-rw-r--r-- | common/src/leap/soledad/common/ddocs/syncs/updates/put.js | 118 |
2 files changed, 147 insertions, 16 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..c0adfc70 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend): ) def _set_replica_gen_and_trans_id(self, other_replica_uid, - other_generation, other_transaction_id): + other_generation, other_transaction_id, + number_of_docs=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ self._do_set_replica_gen_and_trans_id( - other_replica_uid, other_generation, other_transaction_id) + other_replica_uid, other_generation, other_transaction_id, + number_of_docs=number_of_docs, sync_id=sync_id) def _do_set_replica_gen_and_trans_id( - self, other_replica_uid, other_generation, other_transaction_id): + self, other_replica_uid, other_generation, other_transaction_id, + number_of_docs=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str :raise MissingDesignDocError: Raised when tried to access a missing design document. @@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend): res = self._database.resource(*ddoc_path) try: with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + } + if sync_id is not None: + body['sync_id'] = sync_id + if number_of_docs is not None: + body['number_of_docs'] = number_of_docs res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, + body=body, headers={'content-type': 'application/json'}) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend): doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, - replica_trans_id=''): + replica_trans_id='', number_of_docs=None, + sync_id=None): """ Insert/update document into the database with a given revision. @@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend): :param replica_trans_id: The transaction_id associated with the generation. :type replica_trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param sync_id: The id of the current sync session. + :type sync_id: str :return: (state, at_gen) - If we don't have doc_id already, or if doc_rev supersedes the existing document revision, then the @@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend): self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id) + replica_uid, replica_gen, replica_trans_id, + number_of_docs=number_of_docs, sync_id=sync_id) # update info old_doc.rev = doc.rev if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..d754faaa 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,128 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + * { + * 'syncs': [ + * ['<replica_uid>', <gen>, '<trans_id>'], + * ... + * ], + * 'pending': { + * 'other_replica_uid': { + * 'sync_id': '<sync_id>', + * 'log': [[<gen>, '<trans_id>'], ...] + * }, + * ... + * } + * } + * + * The update function below does the following: + * + * 0. If we do not receive a sync_id, we just update the 'syncs' list with + * the incoming info about the source replica state. + * + * 1. Otherwise, if the incoming sync_id differs from current stored + * sync_id, then we assume that the previous sync session for that source + * replica was interrupted and discard all pending data. + * + * 2. Then we append incoming info as pending data for that source replica + * and current sync_id, and sort the pending data by generation. + * + * 3. Then we go through pending data and find the most recent generation + * that we can use to update the actual sync log. + * + * 4. Finally, we insert the most up to date information into the sync log. + */ function(doc, req){ + + // create the document if it doesn't exist if (!doc) { doc = {} doc['_id'] = 'u1db_sync_log'; doc['syncs'] = []; } - body = JSON.parse(req.body); + + // get and validate incoming info + var body = JSON.parse(req.body); + var other_replica_uid = body['other_replica_uid']; + var other_generation = parseInt(body['other_generation']); + var other_transaction_id = body['other_transaction_id'] + var sync_id = body['sync_id']; + var number_of_docs = body['number_of_docs']; + if (number_of_docs != null) + number_of_docs = parseInt(number_of_docs); + + if (other_replica_uid == null + || other_generation == null + || other_transaction_id == null) + return [null, 'invalid data']; + + // create slot for pending logs + if (doc['pending'] == null) + doc['pending'] = {}; + + // these are the values that will be actually inserted + var current_gen = other_generation; + var current_trans_id = other_transaction_id; + + /*------------ Wait for end of sync session before storing ------------*/ + + // we just try to obtain pending log if we received a sync_id + if (sync_id != null) { + + // create slot for current source and sync_id pending log + if (doc['pending'][other_replica_uid] == null + || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { + doc['pending'][other_replica_uid] = { + 'sync_id': sync_id, + 'log': [], + } + } + + // append incoming data to pending log + doc['pending'][other_replica_uid]['log'].push([ + other_generation, + other_transaction_id + ]) + + // leave the sync log untouched if we still did not receive all docs + if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) + return [doc, 'ok']; + + // otherwise, sort pending log according to generation + doc['pending'][other_replica_uid]['log'].sort(function(a, b) { + return a[0] - b[0]; + }); + + // get most up-to-date information from pending log + pending = doc['pending'][other_replica_uid]['log'].pop() + current_gen = pending[0]; + current_trans_id = pending[1]; + + // and remove all pending data from that replica + delete doc['pending'][other_replica_uid] + } + + /*--------------- Store source replica info on sync log ---------------*/ + // remove outdated info doc['syncs'] = doc['syncs'].filter( function (entry) { - return entry[0] != body['other_replica_uid']; + return entry[0] != other_replica_uid; } ); - // store u1db rev + + // store in log doc['syncs'].push([ - body['other_replica_uid'], - body['other_generation'], - body['other_transaction_id'] + other_replica_uid, + current_gen, + current_trans_id ]); + return [doc, 'ok']; } |