From 1e69bf4aceb2502a17dff98581acc7abcf41e168 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 7 Jul 2014 11:34:47 -0300 Subject: Update target sync with sequential info (#5869). --- client/src/leap/soledad/client/target.py | 8 ++-- common/src/leap/soledad/common/couch.py | 24 +++++++---- .../leap/soledad/common/ddocs/syncs/updates/put.js | 47 ++++++++++++++++------ server/src/leap/soledad/server/sync.py | 13 ++++-- 4 files changed, 66 insertions(+), 26 deletions(-) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index c459925d..7e563823 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -652,7 +652,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): return self._response() def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, number_of_docs): + id, rev, content, gen, trans_id, number_of_docs, doc_idx): """ Put a sync document on server by means of a POST request. @@ -676,6 +676,8 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :return: The body and headers of the response. :rtype: tuple @@ -694,7 +696,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): size += self._prepare( ',', entries, id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, - number_of_docs=number_of_docs) + number_of_docs=number_of_docs, doc_idx=doc_idx) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -1189,7 +1191,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.doc_syncer.set_request_method( 'put', sync_id, cur_target_gen, cur_target_trans_id, id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs) + trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1) # set the success calback def _success_callback(idx, total, response): diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index c0adfc70..5658f4ce 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1107,7 +1107,8 @@ class CouchDatabase(CommonBackend): def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id, - number_of_docs=None, sync_id=None): + number_of_docs=None, doc_idx=None, + sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1126,16 +1127,18 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str """ self._do_set_replica_gen_and_trans_id( other_replica_uid, other_generation, other_transaction_id, - number_of_docs=number_of_docs, sync_id=sync_id) + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) def _do_set_replica_gen_and_trans_id( self, other_replica_uid, other_generation, other_transaction_id, - number_of_docs=None, sync_id=None): + number_of_docs=None, doc_idx=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1154,6 +1157,8 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str @@ -1181,10 +1186,12 @@ class CouchDatabase(CommonBackend): 'other_generation': other_generation, 'other_transaction_id': other_transaction_id, } - if sync_id is not None: - body['sync_id'] = sync_id if number_of_docs is not None: body['number_of_docs'] = number_of_docs + if doc_idx is not None: + body['doc_idx'] = doc_idx + if sync_id is not None: + body['sync_id'] = sync_id res.put_json( body=body, headers={'content-type': 'application/json'}) @@ -1325,7 +1332,7 @@ class CouchDatabase(CommonBackend): def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, replica_trans_id='', number_of_docs=None, - sync_id=None): + doc_idx=None, sync_id=None): """ Insert/update document into the database with a given revision. @@ -1361,6 +1368,8 @@ class CouchDatabase(CommonBackend): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str @@ -1423,7 +1432,8 @@ class CouchDatabase(CommonBackend): if replica_uid is not None and replica_gen is not None: self._set_replica_gen_and_trans_id( replica_uid, replica_gen, replica_trans_id, - number_of_docs=number_of_docs, sync_id=sync_id) + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) # update info old_doc.rev = doc.rev if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index d754faaa..b0ae2de6 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -53,8 +53,13 @@ function(doc, req){ var other_transaction_id = body['other_transaction_id'] var sync_id = body['sync_id']; var number_of_docs = body['number_of_docs']; + var doc_idx = body['doc_idx']; + + // parse integers if (number_of_docs != null) number_of_docs = parseInt(number_of_docs); + if (doc_idx != null) + doc_idx = parseInt(doc_idx); if (other_replica_uid == null || other_generation == null @@ -69,7 +74,7 @@ function(doc, req){ var current_gen = other_generation; var current_trans_id = other_transaction_id; - /*------------ Wait for end of sync session before storing ------------*/ + /*------------- Wait for sequential values before storing -------------*/ // we just try to obtain pending log if we received a sync_id if (sync_id != null) { @@ -80,31 +85,49 @@ function(doc, req){ doc['pending'][other_replica_uid] = { 'sync_id': sync_id, 'log': [], + 'last_doc_idx': 0, } } // append incoming data to pending log doc['pending'][other_replica_uid]['log'].push([ other_generation, - other_transaction_id + other_transaction_id, + doc_idx, ]) - // leave the sync log untouched if we still did not receive all docs - if (doc['pending'][other_replica_uid]['log'].length < number_of_docs) - return [doc, 'ok']; - - // otherwise, sort pending log according to generation + // sort pending log according to generation doc['pending'][other_replica_uid]['log'].sort(function(a, b) { return a[0] - b[0]; }); // get most up-to-date information from pending log - pending = doc['pending'][other_replica_uid]['log'].pop() - current_gen = pending[0]; - current_trans_id = pending[1]; + var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; + var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + + current_gen = null; + current_trans_id = null; + + while (last_doc_idx + 1 == pending_idx) { + pending = doc['pending'][other_replica_uid]['log'].shift() + current_gen = pending[0]; + current_trans_id = pending[1]; + last_doc_idx = pending[2] + if (doc['pending'][other_replica_uid]['log'].length == 0) + break; + pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + } + + // leave the sync log untouched if we still did not receive enough docs + if (current_gen == null) + return [doc, 'ok']; + + // update last index of received doc + doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; - // and remove all pending data from that replica - delete doc['pending'][other_replica_uid] + // eventually remove all pending data from that replica + if (last_doc_idx == number_of_docs) + delete doc['pending'][other_replica_uid] } /*--------------- Store source replica info on sync log ---------------*/ diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3a1881fc..6dc99b5a 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -287,7 +287,7 @@ class SyncExchange(sync.SyncExchange): return_doc_cb(doc, gen, trans_id) def insert_doc_from_source(self, doc, source_gen, trans_id, - number_of_docs=None, sync_id=None): + number_of_docs=None, doc_idx=None, sync_id=None): """Try to insert synced document from source. Conflicting documents are not inserted but will be sent over @@ -308,13 +308,15 @@ class SyncExchange(sync.SyncExchange): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int :param sync_id: The id of the current sync session. :type sync_id: str """ state, at_gen = self._db._put_doc_if_newer( doc, save_conflict=False, replica_uid=self.source_replica_uid, replica_gen=source_gen, replica_trans_id=trans_id, - number_of_docs=number_of_docs, sync_id=sync_id) + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) if state == 'inserted': self._sync_state.put_seen_id(doc.doc_id, at_gen) elif state == 'converged': @@ -369,7 +371,8 @@ class SyncResource(http_app.SyncResource): self._sync_id = sync_id @http_app.http_method(content_as_args=True) - def post_put(self, id, rev, content, gen, trans_id, number_of_docs): + def post_put(self, id, rev, content, gen, trans_id, number_of_docs, + doc_idx): """ Put one incoming document into the server replica. @@ -388,11 +391,13 @@ class SyncResource(http_app.SyncResource): :param number_of_docs: The total amount of documents sent on this sync session. :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int """ doc = Document(id, rev, content) self.sync_exch.insert_doc_from_source( doc, gen, trans_id, number_of_docs=number_of_docs, - sync_id=self._sync_id) + doc_idx=doc_idx, sync_id=self._sync_id) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): -- cgit v1.2.3