summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_sqlcipher_sync.py')
-rw-r--r--common/src/leap/soledad/common/tests/test_sqlcipher_sync.py635
1 files changed, 631 insertions, 4 deletions
diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
index dd66a58a..48e64d24 100644
--- a/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
+++ b/common/src/leap/soledad/common/tests/test_sqlcipher_sync.py
@@ -20,10 +20,10 @@ Test sqlcipher backend sync.
import simplejson as json
-from u1db import (
- sync,
- vectorclock,
-)
+
+from u1db import sync
+from u1db import vectorclock
+from u1db import errors
from testscenarios import TestWithScenarios
from urlparse import urljoin
@@ -148,6 +148,32 @@ class SQLCipherDatabaseSyncTests(
return
self.assertEqual(expected, log)
+ def copy_database(self, db, sync_role=None):
+ # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES
+ # IS THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST
+ # THAT WE CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS
+ # RATHER THAN CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND
+ # NINJA TO YOUR HOUSE.
+ db_copy = tests.DatabaseBaseTests.copy_database(self, db)
+ name, orig_sync_role = self._use_tracking[db]
+ self._use_tracking[db_copy] = (name + '(copy)', sync_role or
+ orig_sync_role)
+ return db_copy
+
+ def test_sync_tracks_db_generation_of_other(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.assertEqual(0, self.sync(self.db1, self.db2))
+ self.assertEqual(
+ (0, ''), self.db1._get_replica_gen_and_trans_id('test2'))
+ self.assertEqual(
+ (0, ''), self.db2._get_replica_gen_and_trans_id('test1'))
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [], 'last_known_gen': 0},
+ 'return':
+ {'docs': [], 'last_gen': 0}})
+
def test_sync_autoresolves(self):
"""
Test for sync autoresolve remote.
@@ -265,6 +291,214 @@ class SQLCipherDatabaseSyncTests(
# We use the same reasoning from the last test to suppress this one.
pass
+ def test_sync_pulling_doesnt_update_other_if_changed(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ doc = self.db2.create_doc_from_json(tests.simple_doc)
+ # After the local side has sent its list of docs, before we start
+ # receiving the "targets" response, we update the local database with a
+ # new record.
+ # When we finish synchronizing, we can notice that something locally
+ # was updated, and we cannot tell c2 our new updated generation
+
+ def before_get_docs(state):
+ if state != 'before get_docs':
+ return
+ self.db1.create_doc_from_json(tests.simple_doc)
+
+ self.assertEqual(0, self.sync(self.db1, self.db2,
+ trace_hook=before_get_docs))
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [], 'last_known_gen': 0},
+ 'return':
+ {'docs': [(doc.doc_id, doc.rev)],
+ 'last_gen': 1}})
+ self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0])
+ # c2 should not have gotten a '_record_sync_info' call, because the
+ # local database had been updated more than just by the messages
+ # returned from c2.
+ self.assertEqual(
+ (0, ''), self.db2._get_replica_gen_and_trans_id('test1'))
+
+ def test_sync_doesnt_update_other_if_nothing_pulled(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc)
+
+ def no_record_sync_info(state):
+ if state != 'record_sync_info':
+ return
+ self.fail('SyncTarget.record_sync_info was called')
+ self.assertEqual(1, self.sync(self.db1, self.db2,
+ trace_hook_shallow=no_record_sync_info))
+ self.assertEqual(
+ 1,
+ self.db2._get_replica_gen_and_trans_id(self.db1._replica_uid)[0])
+
+ def test_sync_ignores_convergence(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'both')
+ doc = self.db1.create_doc_from_json(tests.simple_doc)
+ self.db3 = self.create_database('test3', 'target')
+ self.assertEqual(1, self.sync(self.db1, self.db3))
+ self.assertEqual(0, self.sync(self.db2, self.db3))
+ self.assertEqual(1, self.sync(self.db1, self.db2))
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [(doc.doc_id, doc.rev)],
+ 'source_uid': 'test1',
+ 'source_gen': 1, 'last_known_gen': 0},
+ 'return': {'docs': [], 'last_gen': 1}})
+
+ def test_sync_ignores_superseded(self):
+ self.db1 = self.create_database('test1', 'both')
+ self.db2 = self.create_database('test2', 'both')
+ doc = self.db1.create_doc_from_json(tests.simple_doc)
+ doc_rev1 = doc.rev
+ self.db3 = self.create_database('test3', 'target')
+ self.sync(self.db1, self.db3)
+ self.sync(self.db2, self.db3)
+ new_content = '{"key": "altval"}'
+ doc.set_json(new_content)
+ self.db1.put_doc(doc)
+ doc_rev2 = doc.rev
+ self.sync(self.db2, self.db1)
+ self.assertLastExchangeLog(self.db1,
+ {'receive':
+ {'docs': [(doc.doc_id, doc_rev1)],
+ 'source_uid': 'test2',
+ 'source_gen': 1, 'last_known_gen': 0},
+ 'return':
+ {'docs': [(doc.doc_id, doc_rev2)],
+ 'last_gen': 2}})
+ self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False)
+
+ def test_sync_sees_remote_conflicted(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc_id = doc1.doc_id
+ doc1_rev = doc1.rev
+ self.db1.create_index('test-idx', 'key')
+ new_doc = '{"key": "altval"}'
+ doc2 = self.db2.create_doc_from_json(new_doc, doc_id=doc_id)
+ doc2_rev = doc2.rev
+ self.assertTransactionLog([doc1.doc_id], self.db1)
+ self.sync(self.db1, self.db2)
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [(doc_id, doc1_rev)],
+ 'source_uid': 'test1',
+ 'source_gen': 1, 'last_known_gen': 0},
+ 'return':
+ {'docs': [(doc_id, doc2_rev)],
+ 'last_gen': 1}})
+ self.assertTransactionLog([doc_id, doc_id], self.db1)
+ self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True)
+ self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False)
+ from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
+ self.assertEqual(doc2.doc_id, from_idx.doc_id)
+ self.assertEqual(doc2.rev, from_idx.rev)
+ self.assertTrue(from_idx.has_conflicts)
+ self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+
+ def test_sync_sees_remote_delete_conflicted(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc_id = doc1.doc_id
+ self.db1.create_index('test-idx', 'key')
+ self.sync(self.db1, self.db2)
+ doc2 = self.make_document(doc1.doc_id, doc1.rev, doc1.get_json())
+ new_doc = '{"key": "altval"}'
+ doc1.set_json(new_doc)
+ self.db1.put_doc(doc1)
+ self.db2.delete_doc(doc2)
+ self.assertTransactionLog([doc_id, doc_id], self.db1)
+ self.sync(self.db1, self.db2)
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [(doc_id, doc1.rev)],
+ 'source_uid': 'test1',
+ 'source_gen': 2, 'last_known_gen': 1},
+ 'return': {'docs': [(doc_id, doc2.rev)],
+ 'last_gen': 2}})
+ self.assertTransactionLog([doc_id, doc_id, doc_id], self.db1)
+ self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True)
+ self.assertGetDocIncludeDeleted(
+ self.db2, doc_id, doc2.rev, None, False)
+ self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+
+ def test_sync_local_race_conflicted(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ doc = self.db1.create_doc_from_json(tests.simple_doc)
+ doc_id = doc.doc_id
+ doc1_rev = doc.rev
+ self.db1.create_index('test-idx', 'key')
+ self.sync(self.db1, self.db2)
+ content1 = '{"key": "localval"}'
+ content2 = '{"key": "altval"}'
+ doc.set_json(content2)
+ self.db2.put_doc(doc)
+ doc2_rev2 = doc.rev
+ triggered = []
+
+ def after_whatschanged(state):
+ if state != 'after whats_changed':
+ return
+ triggered.append(True)
+ doc = self.make_document(doc_id, doc1_rev, content1)
+ self.db1.put_doc(doc)
+
+ self.sync(self.db1, self.db2, trace_hook=after_whatschanged)
+ self.assertEqual([True], triggered)
+ self.assertGetDoc(self.db1, doc_id, doc2_rev2, content2, True)
+ from_idx = self.db1.get_from_index('test-idx', 'altval')[0]
+ self.assertEqual(doc.doc_id, from_idx.doc_id)
+ self.assertEqual(doc.rev, from_idx.rev)
+ self.assertTrue(from_idx.has_conflicts)
+ self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+ self.assertEqual([], self.db1.get_from_index('test-idx', 'localval'))
+
+ def test_sync_propagates_deletes(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'both')
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc_id = doc1.doc_id
+ self.db1.create_index('test-idx', 'key')
+ self.sync(self.db1, self.db2)
+ self.db2.create_index('test-idx', 'key')
+ self.db3 = self.create_database('test3', 'target')
+ self.sync(self.db1, self.db3)
+ self.db1.delete_doc(doc1)
+ deleted_rev = doc1.rev
+ self.sync(self.db1, self.db2)
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [(doc_id, deleted_rev)],
+ 'source_uid': 'test1',
+ 'source_gen': 2, 'last_known_gen': 1},
+ 'return': {'docs': [], 'last_gen': 2}})
+ self.assertGetDocIncludeDeleted(
+ self.db1, doc_id, deleted_rev, None, False)
+ self.assertGetDocIncludeDeleted(
+ self.db2, doc_id, deleted_rev, None, False)
+ self.assertEqual([], self.db1.get_from_index('test-idx', 'value'))
+ self.assertEqual([], self.db2.get_from_index('test-idx', 'value'))
+ self.sync(self.db2, self.db3)
+ self.assertLastExchangeLog(self.db3,
+ {'receive':
+ {'docs': [(doc_id, deleted_rev)],
+ 'source_uid': 'test2',
+ 'source_gen': 2,
+ 'last_known_gen': 0},
+ 'return':
+ {'docs': [], 'last_gen': 2}})
+ self.assertGetDocIncludeDeleted(
+ self.db3, doc_id, deleted_rev, None, False)
+
def test_sync_propagates_resolution(self):
"""
Test if synchronization propagates resolution.
@@ -340,6 +574,132 @@ class SQLCipherDatabaseSyncTests(
'source_gen': 1, 'last_known_gen': 0},
'return': {'docs': [], 'last_gen': 1}})
+ def test_sync_pulls_changes(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ doc = self.db2.create_doc_from_json(tests.simple_doc)
+ self.db1.create_index('test-idx', 'key')
+ self.assertEqual(0, self.sync(self.db1, self.db2))
+ self.assertGetDoc(self.db1, doc.doc_id, doc.rev, tests.simple_doc, False)
+ self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0])
+ self.assertEqual(1, self.db2._get_replica_gen_and_trans_id('test1')[0])
+ self.assertLastExchangeLog(self.db2,
+ {'receive':
+ {'docs': [], 'last_known_gen': 0},
+ 'return':
+ {'docs': [(doc.doc_id, doc.rev)],
+ 'last_gen': 1}})
+ self.assertEqual([doc], self.db1.get_from_index('test-idx', 'value'))
+
+ def test_sync_supersedes_conflicts(self):
+ self.db1 = self.create_database('test1', 'both')
+ self.db2 = self.create_database('test2', 'target')
+ self.db3 = self.create_database('test3', 'both')
+ doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc')
+ self.db2.create_doc_from_json('{"b": 1}', doc_id='the-doc')
+ self.db3.create_doc_from_json('{"c": 1}', doc_id='the-doc')
+ self.sync(self.db3, self.db1)
+ self.assertEqual(
+ self.db1._get_generation_info(),
+ self.db3._get_replica_gen_and_trans_id(self.db1._replica_uid))
+ self.assertEqual(
+ self.db3._get_generation_info(),
+ self.db1._get_replica_gen_and_trans_id(self.db3._replica_uid))
+ self.sync(self.db3, self.db2)
+ self.assertEqual(
+ self.db2._get_generation_info(),
+ self.db3._get_replica_gen_and_trans_id(self.db2._replica_uid))
+ self.assertEqual(
+ self.db3._get_generation_info(),
+ self.db2._get_replica_gen_and_trans_id(self.db3._replica_uid))
+ self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))
+ doc1.set_json('{"a": 2}')
+ self.db1.put_doc(doc1)
+ self.sync(self.db3, self.db1)
+ # original doc1 should have been removed from conflicts
+ self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc')))
+
+ def test_sync_stops_after_get_sync_info(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc)
+ self.sync(self.db1, self.db2)
+
+ def put_hook(state):
+ self.fail("Tracehook triggered for %s" % (state,))
+
+ self.sync(self.db1, self.db2, trace_hook_shallow=put_hook)
+
+ def test_sync_detects_rollback_in_source(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')
+ self.sync(self.db1, self.db2)
+ self.db1_copy = self.copy_database(self.db1)
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.sync(self.db1, self.db2)
+ self.assertRaises(
+ errors.InvalidGeneration, self.sync, self.db1_copy, self.db2)
+
+ def test_sync_detects_rollback_in_target(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.sync(self.db1, self.db2)
+ self.db2_copy = self.copy_database(self.db2)
+ self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.sync(self.db1, self.db2)
+ self.assertRaises(
+ errors.InvalidGeneration, self.sync, self.db1, self.db2_copy)
+
+ def test_sync_detects_diverged_source(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db3 = self.copy_database(self.db1)
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.db3.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.sync(self.db1, self.db2)
+ self.assertRaises(
+ errors.InvalidTransactionId, self.sync, self.db3, self.db2)
+
+ def test_sync_detects_diverged_target(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db3 = self.copy_database(self.db2)
+ self.db3.create_doc_from_json(tests.nested_doc, doc_id="divergent")
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.sync(self.db1, self.db2)
+ self.assertRaises(
+ errors.InvalidTransactionId, self.sync, self.db1, self.db3)
+
+ def test_sync_detects_rollback_and_divergence_in_source(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1')
+ self.sync(self.db1, self.db2)
+ self.db1_copy = self.copy_database(self.db1)
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.sync(self.db1, self.db2)
+ self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.assertRaises(
+ errors.InvalidTransactionId, self.sync, self.db1_copy, self.db2)
+
+ def test_sync_detects_rollback_and_divergence_in_target(self):
+ self.db1 = self.create_database('test1', 'source')
+ self.db2 = self.create_database('test2', 'target')
+ self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent")
+ self.sync(self.db1, self.db2)
+ self.db2_copy = self.copy_database(self.db2)
+ self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.sync(self.db1, self.db2)
+ self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2')
+ self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3')
+ self.assertRaises(
+ errors.InvalidTransactionId, self.sync, self.db1, self.db2_copy)
+
def _make_local_db_and_token_http_target(test, path='test'):
test.startTwistedServer()
@@ -347,6 +707,7 @@ def _make_local_db_and_token_http_target(test, path='test'):
db = couch.CouchDatabase.open_database(
urljoin(test._couch_url, 'test'),
create=True,
+ replica_uid='test',
ensure_ddocs=True)
replica_uid = test._soledad._dbpool.replica_uid
@@ -373,6 +734,9 @@ class SQLCipherSyncTargetTests(
tests.TestCaseWithServer,
SoledadWithCouchServerMixin):
+ # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
+ # skipped tests can be succesfully executed.
+
scenarios = (tests.multiply_scenarios(SQLCIPHER_SCENARIOS,
target_scenarios))
@@ -401,6 +765,41 @@ class SQLCipherSyncTargetTests(
self.request_state = couch.CouchServerState(self._couch_url)
return self.make_app_with_state(self.request_state)
+ def set_trace_hook(self, callback, shallow=False):
+ setter = (self.st._set_trace_hook if not shallow else
+ self.st._set_trace_hook_shallow)
+ try:
+ setter(callback)
+ except NotImplementedError:
+ self.skipTest("%s does not implement _set_trace_hook"
+ % (self.st.__class__.__name__,))
+
+ def test_get_sync_target(self):
+ self.assertIsNot(None, self.st)
+
+ @defer.inlineCallbacks
+ def test_get_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+
+ @defer.inlineCallbacks
+ def test_create_doc_updates_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+ self.db.create_doc_from_json(tests.simple_doc)
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(1, sync_info[1])
+
+ @defer.inlineCallbacks
+ def test_record_sync_info(self):
+ yield self.st.record_sync_info('replica', 10, 'T-transid')
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 10, 'T-transid'), sync_info)
+
+
@defer.inlineCallbacks
def test_sync_exchange(self):
"""
@@ -476,3 +875,231 @@ class SQLCipherSyncTargetTests(
self.db._last_exchange_log['return'],
{'last_gen': 2, 'docs':
[(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_deleted(self):
+ doc = self.db.create_doc_from_json('{}')
+ edit_rev = 'replica:1|' + doc.rev
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertGetDocIncludeDeleted(
+ self.db, doc.doc_id, edit_rev, None, False)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(10, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_refuses_conflicts(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1), self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_ignores_convergence(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ gen, txid = self.db._get_generation_info()
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, doc.rev, tests.simple_doc), 10, 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=gen,
+ last_known_trans_id=txid, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(([], 1), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_new_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1), self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_deleted_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.db.delete_doc(doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
+ self.assertEqual(2, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_getting_newer_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates_of_synced_doc(self):
+ expected = []
+
+ def before_whatschanged_cb(state):
+ if state != 'before whats_changed':
+ return
+ cont = '{"key": "cuncurrent"}'
+ conc_rev = self.db.put_doc(
+ self.make_document(doc.doc_id, 'test:1|z:2', cont))
+ expected.append((doc.doc_id, conc_rev, cont, 3))
+
+ self.set_trace_hook(before_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(expected, [c[:-1] for c in self.other_changes])
+ self.assertEqual(3, new_gen)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates(self):
+
+ def after_whatschanged_cb(state):
+ if state != 'after whats_changed':
+ return
+ self.db.create_doc_from_json('{"new": "doc"}')
+
+ self.set_trace_hook(after_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_converged_handling(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ docs_by_gen = [
+ (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
+ (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
+ 'T-bar')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_detect_incomplete_exchange(self):
+ def before_get_docs_explode(state):
+ if state != 'before get_docs':
+ return
+ raise errors.U1DBError("fail")
+ self.set_trace_hook(before_get_docs_explode)
+ # suppress traceback printing in the wsgiref server
+ #self.patch(simple_server.ServerHandler,
+ # 'log_exception', lambda h, exc_info: None)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ with self.assertRaises((errors.U1DBError, errors.BrokenSyncStream)):
+ yield self.st.sync_exchange(
+ [], 'other-replica',
+ last_known_generation=0, last_known_trans_id=None,
+ insert_doc_cb=self.receive_doc)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_doc_ids(self):
+ sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None)
+ if sync_exchange_doc_ids is None:
+ self.skipTest("sync_exchange_doc_ids not implemented")
+ db2 = self.create_database('test2')
+ doc = db2.create_doc_from_json(tests.simple_doc)
+ new_gen, trans_id = sync_exchange_doc_ids(
+ db2, [(doc.doc_id, 10, 'T-sid')], 0, None,
+ insert_doc_cb=self.receive_doc)
+ self.assertGetDoc(self.db, doc.doc_id, doc.rev, tests.simple_doc, False)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3])
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook(self):
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb)
+ yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ yield self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ],
+ called)
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook_shallow(self):
+ if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or
+ self.st._set_trace_hook_shallow.im_func ==
+ SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func):
+ # shallow same as full
+ expected = ['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ]
+ else:
+ expected = ['sync_exchange', 'record_sync_info']
+
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb, shallow=True)
+ self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(expected, called)