summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/target.py8
-rw-r--r--common/src/leap/soledad/common/couch.py24
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/put.js47
-rw-r--r--server/src/leap/soledad/server/sync.py13
4 files changed, 66 insertions, 26 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index c459925d..7e563823 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -652,7 +652,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
return self._response()
def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, number_of_docs):
+ id, rev, content, gen, trans_id, number_of_docs, doc_idx):
"""
Put a sync document on server by means of a POST request.
@@ -676,6 +676,8 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
:return: The body and headers of the response.
:rtype: tuple
@@ -694,7 +696,7 @@ class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
size += self._prepare(
',', entries,
id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
- number_of_docs=number_of_docs)
+ number_of_docs=number_of_docs, doc_idx=doc_idx)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -1189,7 +1191,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.doc_syncer.set_request_method(
'put', sync_id, cur_target_gen, cur_target_trans_id,
id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs)
+ trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1)
# set the success calback
def _success_callback(idx, total, response):
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index c0adfc70..5658f4ce 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -1107,7 +1107,8 @@ class CouchDatabase(CommonBackend):
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id,
- number_of_docs=None, sync_id=None):
+ number_of_docs=None, doc_idx=None,
+ sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1126,16 +1127,18 @@ class CouchDatabase(CommonBackend):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
"""
self._do_set_replica_gen_and_trans_id(
other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=number_of_docs, sync_id=sync_id)
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
def _do_set_replica_gen_and_trans_id(
self, other_replica_uid, other_generation, other_transaction_id,
- number_of_docs=None, sync_id=None):
+ number_of_docs=None, doc_idx=None, sync_id=None):
"""
Set the last-known generation and transaction id for the other
database replica.
@@ -1154,6 +1157,8 @@ class CouchDatabase(CommonBackend):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
@@ -1181,10 +1186,12 @@ class CouchDatabase(CommonBackend):
'other_generation': other_generation,
'other_transaction_id': other_transaction_id,
}
- if sync_id is not None:
- body['sync_id'] = sync_id
if number_of_docs is not None:
body['number_of_docs'] = number_of_docs
+ if doc_idx is not None:
+ body['doc_idx'] = doc_idx
+ if sync_id is not None:
+ body['sync_id'] = sync_id
res.put_json(
body=body,
headers={'content-type': 'application/json'})
@@ -1325,7 +1332,7 @@ class CouchDatabase(CommonBackend):
def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen,
replica_trans_id='', number_of_docs=None,
- sync_id=None):
+ doc_idx=None, sync_id=None):
"""
Insert/update document into the database with a given revision.
@@ -1361,6 +1368,8 @@ class CouchDatabase(CommonBackend):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document being sent.
+ :type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
@@ -1423,7 +1432,8 @@ class CouchDatabase(CommonBackend):
if replica_uid is not None and replica_gen is not None:
self._set_replica_gen_and_trans_id(
replica_uid, replica_gen, replica_trans_id,
- number_of_docs=number_of_docs, sync_id=sync_id)
+ number_of_docs=number_of_docs, doc_idx=doc_idx,
+ sync_id=sync_id)
# update info
old_doc.rev = doc.rev
if doc.is_tombstone():
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
index d754faaa..b0ae2de6 100644
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
+++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
@@ -53,8 +53,13 @@ function(doc, req){
var other_transaction_id = body['other_transaction_id']
var sync_id = body['sync_id'];
var number_of_docs = body['number_of_docs'];
+ var doc_idx = body['doc_idx'];
+
+ // parse integers
if (number_of_docs != null)
number_of_docs = parseInt(number_of_docs);
+ if (doc_idx != null)
+ doc_idx = parseInt(doc_idx);
if (other_replica_uid == null
|| other_generation == null
@@ -69,7 +74,7 @@ function(doc, req){
var current_gen = other_generation;
var current_trans_id = other_transaction_id;
- /*------------ Wait for end of sync session before storing ------------*/
+ /*------------- Wait for sequential values before storing -------------*/
// we just try to obtain pending log if we received a sync_id
if (sync_id != null) {
@@ -80,31 +85,49 @@ function(doc, req){
doc['pending'][other_replica_uid] = {
'sync_id': sync_id,
'log': [],
+ 'last_doc_idx': 0,
}
}
// append incoming data to pending log
doc['pending'][other_replica_uid]['log'].push([
other_generation,
- other_transaction_id
+ other_transaction_id,
+ doc_idx,
])
- // leave the sync log untouched if we still did not receive all docs
- if (doc['pending'][other_replica_uid]['log'].length < number_of_docs)
- return [doc, 'ok'];
-
- // otherwise, sort pending log according to generation
+ // sort pending log according to generation
doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
return a[0] - b[0];
});
// get most up-to-date information from pending log
- pending = doc['pending'][other_replica_uid]['log'].pop()
- current_gen = pending[0];
- current_trans_id = pending[1];
+ var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx'];
+ var pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
+
+ current_gen = null;
+ current_trans_id = null;
+
+ while (last_doc_idx + 1 == pending_idx) {
+ pending = doc['pending'][other_replica_uid]['log'].shift()
+ current_gen = pending[0];
+ current_trans_id = pending[1];
+ last_doc_idx = pending[2]
+ if (doc['pending'][other_replica_uid]['log'].length == 0)
+ break;
+ pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
+ }
+
+ // leave the sync log untouched if we still did not receive enough docs
+ if (current_gen == null)
+ return [doc, 'ok'];
+
+ // update last index of received doc
+ doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx;
- // and remove all pending data from that replica
- delete doc['pending'][other_replica_uid]
+ // eventually remove all pending data from that replica
+ if (last_doc_idx == number_of_docs)
+ delete doc['pending'][other_replica_uid]
}
/*--------------- Store source replica info on sync log ---------------*/
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 3a1881fc..6dc99b5a 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -287,7 +287,7 @@ class SyncExchange(sync.SyncExchange):
return_doc_cb(doc, gen, trans_id)
def insert_doc_from_source(self, doc, source_gen, trans_id,
- number_of_docs=None, sync_id=None):
+ number_of_docs=None, doc_idx=None, sync_id=None):
"""Try to insert synced document from source.
Conflicting documents are not inserted but will be sent over
@@ -308,13 +308,15 @@ class SyncExchange(sync.SyncExchange):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
"""
state, at_gen = self._db._put_doc_if_newer(
doc, save_conflict=False, replica_uid=self.source_replica_uid,
replica_gen=source_gen, replica_trans_id=trans_id,
- number_of_docs=number_of_docs, sync_id=sync_id)
+ number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)
if state == 'inserted':
self._sync_state.put_seen_id(doc.doc_id, at_gen)
elif state == 'converged':
@@ -369,7 +371,8 @@ class SyncResource(http_app.SyncResource):
self._sync_id = sync_id
@http_app.http_method(content_as_args=True)
- def post_put(self, id, rev, content, gen, trans_id, number_of_docs):
+ def post_put(self, id, rev, content, gen, trans_id, number_of_docs,
+ doc_idx):
"""
Put one incoming document into the server replica.
@@ -388,11 +391,13 @@ class SyncResource(http_app.SyncResource):
:param number_of_docs: The total amount of documents sent on this sync
session.
:type number_of_docs: int
+ :param doc_idx: The index of the current document.
+ :type doc_idx: int
"""
doc = Document(id, rev, content)
self.sync_exch.insert_doc_from_source(
doc, gen, trans_id, number_of_docs=number_of_docs,
- sync_id=self._sync_id)
+ doc_idx=doc_idx, sync_id=self._sync_id)
@http_app.http_method(received=int, content_as_args=True)
def post_get(self, received):