summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-02 12:22:25 -0300
committerdrebs <drebs@leap.se>2014-07-02 12:41:02 -0300
commit222c229d29b9b8e6ed72897b1c96d23f0d8de80e (patch)
tree081527150a859909290d70dccadd3218346de908 /common/src/leap/soledad
parentc3870a4315acf30893679e4cd11c990a4338e47b (diff)
Split sync_exchange into many requests (#5517).
Diffstat (limited to 'common/src/leap/soledad')
-rw-r--r--common/src/leap/soledad/common/couch.py45
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/put.js118
2 files changed, 147 insertions, 16 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index b51b32f3..c0adfc70 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -1106,7 +1106,8 @@ class CouchDatabase(CommonBackend):
)
def _set_replica_gen_and_trans_id(self, other_replica_uid,
- other_generation, other_transaction_id):
+ other_generation, other_transaction_id,
+ number_of_docs=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1122,12 +1123,19 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
"""
self._do_set_replica_gen_and_trans_id(
- other_replica_uid, other_generation, other_transaction_id)
+ other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=number_of_docs, sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
- self, other_replica_uid, other_generation, other_transaction_id):
+ self, other_replica_uid, other_generation, other_transaction_id,
+ number_of_docs=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1143,6 +1151,11 @@ class CouchDatabase(CommonBackend):
:param other_transaction_id: The transaction id associated with the
generation.
:type other_transaction_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:raise MissingDesignDocError: Raised when tried to access a missing
design document.
@@ -1163,12 +1176,17 @@ class CouchDatabase(CommonBackend):
res = self._database.resource(*ddoc_path)
try:
with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
+ body={
+ 'other_replica_uid': other_replica_uid,
+ 'other_generation': other_generation,
+ 'other_transaction_id': other_transaction_id,
+ }
+ if sync_id is not None:
+ body['sync_id'] = sync_id
+ if number_of_docs is not None:
+ body['number_of_docs'] = number_of_docs
res.put_json(
- body={
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- },
+ body=body,
headers={'content-type': 'application/json'})
except ResourceNotFound as e:
raise_missing_design_doc_error(e, ddoc_path)
@@ -1306,7 +1324,8 @@ class CouchDatabase(CommonBackend):
doc.set_conflicts(cur_doc.get_conflicts())
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
- replica_trans_id=''):
+ replica_trans_id='', number_of_docs=None,
+ sync_id=None):
"""
Insert/update document into the database with a given revision.
@@ -1339,6 +1358,11 @@ class CouchDatabase(CommonBackend):
:param replica_trans_id: The transaction_id associated with the
generation.
:type replica_trans_id: str
+ :param number_of_docs: The total amount of documents sent on this sync
+ session.
+ :type number_of_docs: int
+ :param sync_id: The id of the current sync session.
+ :type sync_id: str
:return: (state, at_gen) - If we don't have doc_id already, or if
doc_rev supersedes the existing document revision, then the
@@ -1398,7 +1422,8 @@ class CouchDatabase(CommonBackend):
self._force_doc_sync_conflict(doc)
if replica_uid is not None and replica_gen is not None:
self._set_replica_gen_and_trans_id(
- replica_uid, replica_gen, replica_trans_id)
+ replica_uid, replica_gen, replica_trans_id,
+ number_of_docs=number_of_docs, sync_id=sync_id)
# update info
old_doc.rev = doc.rev
if doc.is_tombstone():
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
index 722f695a..d754faaa 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
@@ -1,22 +1,128 @@
+/**
+ * The u1db_sync_log document stores both the actual sync log and a list of
+ * pending updates to the log, in case we receive incoming documents out of
+ * the correct order (i.e. if there are parallel PUTs during the sync
+ * process).
+ *
+ * The structure of the document is the following:
+ *
+ * {
+ * 'syncs': [
+ * ['<replica_uid>', <gen>, '<trans_id>'],
+ * ...
+ * ],
+ * 'pending': {
+ * 'other_replica_uid': {
+ * 'sync_id': '<sync_id>',
+ * 'log': [[<gen>, '<trans_id>'], ...]
+ * },
+ * ...
+ * }
+ * }
+ *
+ * The update function below does the following:
+ *
+ * 0. If we do not receive a sync_id, we just update the 'syncs' list with
+ * the incoming info about the source replica state.
+ *
+ * 1. Otherwise, if the incoming sync_id differs from current stored
+ * sync_id, then we assume that the previous sync session for that source
+ * replica was interrupted and discard all pending data.
+ *
+ * 2. Then we append incoming info as pending data for that source replica
+ * and current sync_id, and sort the pending data by generation.
+ *
+ * 3. Then we go through pending data and find the most recent generation
+ * that we can use to update the actual sync log.
+ *
+ * 4. Finally, we insert the most up to date information into the sync log.
+ */
function(doc, req){
+
+ // create the document if it doesn't exist
if (!doc) {
doc = {}
doc['_id'] = 'u1db_sync_log';
doc['syncs'] = [];
}
- body = JSON.parse(req.body);
+
+ // get and validate incoming info
+ var body = JSON.parse(req.body);
+ var other_replica_uid = body['other_replica_uid'];
+ var other_generation = parseInt(body['other_generation']);
+ var other_transaction_id = body['other_transaction_id']
+ var sync_id = body['sync_id'];
+ var number_of_docs = body['number_of_docs'];
+ if (number_of_docs != null)
+ number_of_docs = parseInt(number_of_docs);
+
+ if (other_replica_uid == null
+ || other_generation == null
+ || other_transaction_id == null)
+ return [null, 'invalid data'];
+
+ // create slot for pending logs
+ if (doc['pending'] == null)
+ doc['pending'] = {};
+
+ // these are the values that will be actually inserted
+ var current_gen = other_generation;
+ var current_trans_id = other_transaction_id;
+
+ /*------------ Wait for end of sync session before storing ------------*/
+
+ // we just try to obtain pending log if we received a sync_id
+ if (sync_id != null) {
+
+ // create slot for current source and sync_id pending log
+ if (doc['pending'][other_replica_uid] == null
+ || doc['pending'][other_replica_uid]['sync_id'] != sync_id) {
+ doc['pending'][other_replica_uid] = {
+ 'sync_id': sync_id,
+ 'log': [],
+ }
+ }
+
+ // append incoming data to pending log
+ doc['pending'][other_replica_uid]['log'].push([
+ other_generation,
+ other_transaction_id
+ ])
+
+ // leave the sync log untouched if we still did not receive all docs
+ if (doc['pending'][other_replica_uid]['log'].length < number_of_docs)
+ return [doc, 'ok'];
+
+ // otherwise, sort pending log according to generation
+ doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
+ return a[0] - b[0];
+ });
+
+ // get most up-to-date information from pending log
+ pending = doc['pending'][other_replica_uid]['log'].pop()
+ current_gen = pending[0];
+ current_trans_id = pending[1];
+
+ // and remove all pending data from that replica
+ delete doc['pending'][other_replica_uid]
+ }
+
+ /*--------------- Store source replica info on sync log ---------------*/
+
// remove outdated info
doc['syncs'] = doc['syncs'].filter(
function (entry) {
- return entry[0] != body['other_replica_uid'];
+ return entry[0] != other_replica_uid;
}
);
- // store u1db rev
+
+ // store in log
doc['syncs'].push([
- body['other_replica_uid'],
- body['other_generation'],
- body['other_transaction_id']
+ other_replica_uid,
+ current_gen,
+ current_trans_id
]);
+
return [doc, 'ok'];
}