summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/src/leap/soledad/common/couch.py63
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/updates/put.js151
-rw-r--r--common/src/leap/soledad/common/ddocs/syncs/views/log/map.js12
-rw-r--r--common/src/leap/soledad/common/tests/test_couch.py8
-rw-r--r--common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py53
5 files changed, 42 insertions, 245 deletions
diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py
index 8c152353..83f542ab 100644
--- a/common/src/leap/soledad/common/couch.py
+++ b/common/src/leap/soledad/common/couch.py
@@ -1071,14 +1071,20 @@ class CouchDatabase(CommonBackend):
"""
if other_replica_uid in self.cache:
return self.cache[other_replica_uid]
- # query a couch view
- result = self._database.view('syncs/log')
- if len(result[other_replica_uid].rows) == 0:
- return (0, '')
- return (
- result[other_replica_uid].rows[0]['value']['known_generation'],
- result[other_replica_uid].rows[0]['value']['known_transaction_id']
- )
+
+ doc_id = 'u1db_sync_%s' % other_replica_uid
+ try:
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {
+ '_id': doc_id,
+ 'generation': 0,
+ 'transaction_id': '',
+ }
+ self._database.save(doc)
+ result = doc['generation'], doc['transaction_id']
+ self.cache[other_replica_uid] = result
+ return result
def _set_replica_gen_and_trans_id(self, other_replica_uid,
other_generation, other_transaction_id,
@@ -1138,43 +1144,16 @@ class CouchDatabase(CommonBackend):
:type doc_idx: int
:param sync_id: The id of the current sync session.
:type sync_id: str
-
- :raise MissingDesignDocError: Raised when tried to access a missing
- design document.
- :raise MissingDesignDocListFunctionError: Raised when trying to access
- a missing list function on a
- design document.
- :raise MissingDesignDocNamedViewError: Raised when trying to access a
- missing named view on a design
- document.
- :raise MissingDesignDocDeletedError: Raised when trying to access a
- deleted design document.
- :raise MissingDesignDocUnknownError: Raised when failed to access a
- design document for an yet
- unknown reason.
"""
self.cache[other_replica_uid] = (other_generation, other_transaction_id)
- # query a couch update function
- ddoc_path = ['_design', 'syncs', '_update', 'put', 'u1db_sync_log']
- res = self._database.resource(*ddoc_path)
+ doc_id = 'u1db_sync_%s' % other_replica_uid
try:
- with CouchDatabase.update_handler_lock[self._get_replica_uid()]:
- body = {
- 'other_replica_uid': other_replica_uid,
- 'other_generation': other_generation,
- 'other_transaction_id': other_transaction_id,
- }
- if number_of_docs is not None:
- body['number_of_docs'] = number_of_docs
- if doc_idx is not None:
- body['doc_idx'] = doc_idx
- if sync_id is not None:
- body['sync_id'] = sync_id
- res.put_json(
- body=body,
- headers={'content-type': 'application/json'})
- except ResourceNotFound as e:
- raise_missing_design_doc_error(e, ddoc_path)
+ doc = self._database[doc_id]
+ except ResourceNotFound:
+ doc = {'_id': doc_id}
+ doc['generation'] = other_generation
+ doc['transaction_id'] = other_transaction_id
+ self._database.save(doc)
def _force_doc_sync_conflict(self, doc):
"""
diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
deleted file mode 100644
index b0ae2de6..00000000
--- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * The u1db_sync_log document stores both the actual sync log and a list of
- * pending updates to the log, in case we receive incoming documents out of
- * the correct order (i.e. if there are parallel PUTs during the sync
- * process).
- *
- * The structure of the document is the following:
- *
- * {
- * 'syncs': [
- * ['<replica_uid>', <gen>, '<trans_id>'],
- * ...
- * ],
- * 'pending': {
- * 'other_replica_uid': {
- * 'sync_id': '<sync_id>',
- * 'log': [[<gen>, '<trans_id>'], ...]
- * },
- * ...
- * }
- * }
- *
- * The update function below does the following:
- *
- * 0. If we do not receive a sync_id, we just update the 'syncs' list with
- * the incoming info about the source replica state.
- *
- * 1. Otherwise, if the incoming sync_id differs from current stored
- * sync_id, then we assume that the previous sync session for that source
- * replica was interrupted and discard all pending data.
- *
- * 2. Then we append incoming info as pending data for that source replica
- * and current sync_id, and sort the pending data by generation.
- *
- * 3. Then we go through pending data and find the most recent generation
- * that we can use to update the actual sync log.
- *
- * 4. Finally, we insert the most up to date information into the sync log.
- */
-function(doc, req){
-
- // create the document if it doesn't exist
- if (!doc) {
- doc = {}
- doc['_id'] = 'u1db_sync_log';
- doc['syncs'] = [];
- }
-
- // get and validate incoming info
- var body = JSON.parse(req.body);
- var other_replica_uid = body['other_replica_uid'];
- var other_generation = parseInt(body['other_generation']);
- var other_transaction_id = body['other_transaction_id']
- var sync_id = body['sync_id'];
- var number_of_docs = body['number_of_docs'];
- var doc_idx = body['doc_idx'];
-
- // parse integers
- if (number_of_docs != null)
- number_of_docs = parseInt(number_of_docs);
- if (doc_idx != null)
- doc_idx = parseInt(doc_idx);
-
- if (other_replica_uid == null
- || other_generation == null
- || other_transaction_id == null)
- return [null, 'invalid data'];
-
- // create slot for pending logs
- if (doc['pending'] == null)
- doc['pending'] = {};
-
- // these are the values that will be actually inserted
- var current_gen = other_generation;
- var current_trans_id = other_transaction_id;
-
- /*------------- Wait for sequential values before storing -------------*/
-
- // we just try to obtain pending log if we received a sync_id
- if (sync_id != null) {
-
- // create slot for current source and sync_id pending log
- if (doc['pending'][other_replica_uid] == null
- || doc['pending'][other_replica_uid]['sync_id'] != sync_id) {
- doc['pending'][other_replica_uid] = {
- 'sync_id': sync_id,
- 'log': [],
- 'last_doc_idx': 0,
- }
- }
-
- // append incoming data to pending log
- doc['pending'][other_replica_uid]['log'].push([
- other_generation,
- other_transaction_id,
- doc_idx,
- ])
-
- // sort pending log according to generation
- doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
- return a[0] - b[0];
- });
-
- // get most up-to-date information from pending log
- var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx'];
- var pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
-
- current_gen = null;
- current_trans_id = null;
-
- while (last_doc_idx + 1 == pending_idx) {
- pending = doc['pending'][other_replica_uid]['log'].shift()
- current_gen = pending[0];
- current_trans_id = pending[1];
- last_doc_idx = pending[2]
- if (doc['pending'][other_replica_uid]['log'].length == 0)
- break;
- pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
- }
-
- // leave the sync log untouched if we still did not receive enough docs
- if (current_gen == null)
- return [doc, 'ok'];
-
- // update last index of received doc
- doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx;
-
- // eventually remove all pending data from that replica
- if (last_doc_idx == number_of_docs)
- delete doc['pending'][other_replica_uid]
- }
-
- /*--------------- Store source replica info on sync log ---------------*/
-
- // remove outdated info
- doc['syncs'] = doc['syncs'].filter(
- function (entry) {
- return entry[0] != other_replica_uid;
- }
- );
-
- // store in log
- doc['syncs'].push([
- other_replica_uid,
- current_gen,
- current_trans_id
- ]);
-
- return [doc, 'ok'];
-}
-
diff --git a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js b/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js
deleted file mode 100644
index a63c7cf4..00000000
--- a/common/src/leap/soledad/common/ddocs/syncs/views/log/map.js
+++ /dev/null
@@ -1,12 +0,0 @@
-function(doc) {
- if (doc._id == 'u1db_sync_log') {
- if (doc.syncs)
- doc.syncs.forEach(function (entry) {
- emit(entry[0],
- {
- 'known_generation': entry[1],
- 'known_transaction_id': entry[2]
- });
- });
- }
-}
diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py
index a08ffd16..c8d13667 100644
--- a/common/src/leap/soledad/common/tests/test_couch.py
+++ b/common/src/leap/soledad/common/tests/test_couch.py
@@ -1359,10 +1359,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocError,
self.db.whats_changed)
- # _do_set_replica_gen_and_trans_id()
- self.assertRaises(
- errors.MissingDesignDocError,
- self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
def test_missing_design_doc_functions_raises(self):
"""
@@ -1489,10 +1485,6 @@ class CouchDatabaseExceptionsTests(CouchDBTestCase):
self.assertRaises(
errors.MissingDesignDocDeletedError,
self.db.whats_changed)
- # _do_set_replica_gen_and_trans_id()
- self.assertRaises(
- errors.MissingDesignDocDeletedError,
- self.db._do_set_replica_gen_and_trans_id, 1, 2, 3)
def test_ensure_ddoc_independently(self):
"""
diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
index 25f709ca..3e8e8cce 100644
--- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
+++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py
@@ -163,6 +163,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
2,
len(filter(lambda t: t[0] == doc_id, transaction_log)))
+ @defer.inlineCallbacks
def test_correct_sync_log_after_sequential_syncs(self):
"""
Assert that the sync_log increases accordingly with sequential syncs.
@@ -170,21 +171,21 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
sol = self._soledad_instance(
auth_token='auth-token',
server_url=self.getURL())
+ source_replica_uid = sol._dbpool.replica_uid
- def _create_docs(results):
+ def _create_docs():
deferreds = []
for i in xrange(0, REPEAT_TIMES):
deferreds.append(sol.create_doc({}))
- return defer.DeferredList(deferreds)
+ return defer.gatherResults(deferreds)
def _assert_transaction_and_sync_logs(results, sync_idx):
# assert sizes of transaction and sync logs
self.assertEqual(
sync_idx * REPEAT_TIMES,
len(self.db._get_transaction_log()))
- self.assertEqual(
- 1 if sync_idx > 0 else 0,
- len(self.db._database.view('syncs/log').rows))
+ gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid)
+ self.assertEqual(sync_idx * REPEAT_TIMES, gen)
def _assert_sync(results, sync_idx):
gen, docs = results
@@ -193,40 +194,28 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):
# assert sizes of transaction and sync logs
self.assertEqual((sync_idx + 1) * REPEAT_TIMES,
len(self.db._get_transaction_log()))
- sync_log_rows = self.db._database.view('syncs/log').rows
- sync_log = sync_log_rows[0].value
- replica_uid = sync_log_rows[0].key
- known_gen = sync_log['known_generation']
- known_trans_id = sync_log['known_transaction_id']
- # assert sync_log has exactly 1 row
- self.assertEqual(1, len(sync_log_rows))
- # assert it has the correct replica_uid, gen and trans_id
- self.assertEqual(sol._dbpool.replica_uid, replica_uid)
+ target_known_gen, target_known_trans_id = \
+ self.db._get_replica_gen_and_trans_id(source_replica_uid)
+ # assert it has the correct gen and trans_id
conn_key = sol._dbpool._u1dbconnections.keys().pop()
conn = sol._dbpool._u1dbconnections[conn_key]
sol_gen, sol_trans_id = conn._get_generation_info()
- self.assertEqual(sol_gen, known_gen)
- self.assertEqual(sol_trans_id, known_trans_id)
-
- # create some documents
- d = _create_docs(None)
+ self.assertEqual(sol_gen, target_known_gen)
+ self.assertEqual(sol_trans_id, target_known_trans_id)
# sync first time and assert success
- d.addCallback(_assert_transaction_and_sync_logs, 0)
- d.addCallback(lambda _: sol.sync())
- d.addCallback(lambda _: sol.get_all_docs())
- d.addCallback(_assert_sync, 0)
+ results = yield _create_docs()
+ _assert_transaction_and_sync_logs(results, 0)
+ yield sol.sync()
+ results = yield sol.get_all_docs()
+ _assert_sync(results, 0)
# create more docs, sync second time and assert success
- d.addCallback(_create_docs)
- d.addCallback(_assert_transaction_and_sync_logs, 1)
- d.addCallback(lambda _: sol.sync())
- d.addCallback(lambda _: sol.get_all_docs())
- d.addCallback(_assert_sync, 1)
-
- d.addCallback(lambda _: sol.close())
-
- return d
+ results = yield _create_docs()
+ _assert_transaction_and_sync_logs(results, 1)
+ yield sol.sync()
+ results = yield sol.get_all_docs()
+ _assert_sync(results, 1)
#
# Concurrency tests