From 1e69bf4aceb2502a17dff98581acc7abcf41e168 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 7 Jul 2014 11:34:47 -0300 Subject: Update target sync with sequential info (#5869). --- common/src/leap/soledad/common/couch.py | 24 +++++++---- .../leap/soledad/common/ddocs/syncs/updates/put.js | 47 ++++++++++++++++------ 2 files changed, 52 insertions(+), 19 deletions(-) (limited to 'common/src') diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0adfc70..5658f4ce 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1107,7 +1107,8 @@ class CouchDatabase(CommonBackend): def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id, - number_of_docs=None, sync_id=None): + number_of_docs=None, doc_idx=None, + sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1126,16 +1127,18 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str """ self._do_set_replica_gen_and_trans_id( other_replica_uid, other_generation, other_transaction_id, - number_of_docs=number_of_docs, sync_id=sync_id) + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) def _do_set_replica_gen_and_trans_id( self, other_replica_uid, other_generation, other_transaction_id, - number_of_docs=None, sync_id=None): + number_of_docs=None, doc_idx=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1154,6 +1157,8 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str @@ -1181,10 +1186,12 @@ class CouchDatabase(CommonBackend): 'other_generation': other_generation, 'other_transaction_id': other_transaction_id, } - if sync_id is not None: - body['sync_id'] = sync_id if number_of_docs is not None: body['number_of_docs'] = number_of_docs + if doc_idx is not None: + body['doc_idx'] = doc_idx + if sync_id is not None: + body['sync_id'] = sync_id res.put_json( body=body, headers={'content-type': 'application/json'}) @@ -1325,7 +1332,7 @@ class CouchDatabase(CommonBackend): def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id='', number_of_docs=None, - sync_id=None): + doc_idx=None, sync_id=None): """ Insert/update document into the database with a given revision. @@ -1361,6 +1368,8 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str @@ -1423,7 +1432,8 @@ class CouchDatabase(CommonBackend): if replica_uid is not None and replica_gen is not None: self._set_replica_gen_and_trans_id( replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, sync_id=sync_id) + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) # update info old_doc.rev = doc.rev if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index d754faaa..b0ae2de6 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -53,8 +53,13 @@ function(doc, req){ var other_transaction_id = body['other_transaction_id'] var sync_id = body['sync_id']; var number_of_docs = body['number_of_docs']; + var doc_idx = body['doc_idx']; + + // parse integers if (number_of_docs != null) number_of_docs = parseInt(number_of_docs); + if (doc_idx != null) + doc_idx = parseInt(doc_idx); if (other_replica_uid == null || other_generation == null @@ -69,7 +74,7 @@ function(doc, req){ var current_gen = other_generation; var current_trans_id = other_transaction_id; - /*------------ Wait for end of sync session before storing ------------*/ + /*------------- Wait for sequential values before storing -------------*/ // we just try to obtain pending log if we received a sync_id if (sync_id != null) { @@ -80,31 +85,49 @@ function(doc, req){ doc['pending'][other_replica_uid] = { 'sync_id': sync_id, 'log': [], + 'last_doc_idx': 0, } } // append incoming data to pending log doc['pending'][other_replica_uid]['log'].push([ other_generation, - other_transaction_id + other_transaction_id, + doc_idx, ]) - // leave the sync log untouched if we still did not receive all docs - if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) - return [doc, 'ok']; - - // otherwise, sort pending log according to generation + // sort pending log according to generation doc['pending'][other_replica_uid]['log'].sort(function(a, b) { return a[0] - b[0]; }); // get most up-to-date information from pending log - pending = doc['pending'][other_replica_uid]['log'].pop() - current_gen = pending[0]; - current_trans_id = pending[1]; + var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; + var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + + current_gen = null; + current_trans_id = null; + + while (last_doc_idx + 1 == pending_idx) { + pending = doc['pending'][other_replica_uid]['log'].shift() + current_gen = pending[0]; + current_trans_id = pending[1]; + last_doc_idx = pending[2] + if (doc['pending'][other_replica_uid]['log'].length == 0) + break; + pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + } + + // leave the sync log untouched if we still did not receive enough docs + if (current_gen == null) + return [doc, 'ok']; + + // update last index of received doc + doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; - // and remove all pending data from that replica - delete doc['pending'][other_replica_uid] + // eventually remove all pending data from that replica + if (last_doc_idx == number_of_docs) + delete doc['pending'][other_replica_uid] } /*--------------- Store source replica info on sync log ---------------*/ -- cgit v1.2.3