diff options
Diffstat (limited to 'testing/tests/couch')
-rw-r--r-- | testing/tests/couch/__init__.py | 0 | ||||
-rw-r--r-- | testing/tests/couch/couchdb.ini.template | 22 | ||||
-rw-r--r-- | testing/tests/couch/test_couch.py | 1442 | ||||
-rw-r--r-- | testing/tests/couch/test_couch_operations_atomicity.py | 371 |
4 files changed, 1835 insertions, 0 deletions
diff --git a/testing/tests/couch/__init__.py b/testing/tests/couch/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/testing/tests/couch/__init__.py diff --git a/testing/tests/couch/couchdb.ini.template b/testing/tests/couch/couchdb.ini.template new file mode 100644 index 00000000..174d9d86 --- /dev/null +++ b/testing/tests/couch/couchdb.ini.template @@ -0,0 +1,22 @@ +; etc/couchdb/default.ini.tpl. Generated from default.ini.tpl.in by configure. + +; Upgrading CouchDB will overwrite this file. + +[couchdb] +database_dir = %(tempdir)s/lib +view_index_dir = %(tempdir)s/lib +max_document_size = 4294967296 ; 4 GB +os_process_timeout = 120000 ; 120 seconds. for view and external servers. +max_dbs_open = 100 +delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned +uri_file = %(tempdir)s/lib/couch.uri +file_compression = snappy + +[log] +file = %(tempdir)s/log/couch.log +level = info +include_sasl = true + +[httpd] +port = 0 +bind_address = 127.0.0.1 diff --git a/testing/tests/couch/test_couch.py b/testing/tests/couch/test_couch.py new file mode 100644 index 00000000..94c6ca92 --- /dev/null +++ b/testing/tests/couch/test_couch.py @@ -0,0 +1,1442 @@ +# -*- coding: utf-8 -*- +# test_couch.py +# Copyright (C) 2013-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test ObjectStore and Couch backend bits. +""" +import json + +from uuid import uuid4 +from urlparse import urljoin + +from couchdb.client import Server + +from testscenarios import TestWithScenarios +from twisted.trial import unittest +from mock import Mock + +from leap.soledad.common.l2db import errors as u1db_errors +from leap.soledad.common.l2db import SyncTarget +from leap.soledad.common.l2db import vectorclock + +from leap.soledad.common import couch +from leap.soledad.common.document import ServerDocument +from leap.soledad.common.couch import errors + +from test_soledad import u1db_tests as tests +from test_soledad.util import CouchDBTestCase +from test_soledad.util import make_local_db_and_target +from test_soledad.util import sync_via_synchronizer + +from test_soledad.u1db_tests import test_backends +from test_soledad.u1db_tests import DatabaseBaseTests + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_common_backend`. +# ----------------------------------------------------------------------------- + +class TestCouchBackendImpl(CouchDBTestCase): + + def test__allocate_doc_id(self): + db = couch.CouchDatabase.open_database( + urljoin( + 'http://localhost:' + str(self.couch_port), + ('test-%s' % uuid4().hex) + ), + create=True, + ensure_ddocs=True) + doc_id1 = db._allocate_doc_id() + self.assertTrue(doc_id1.startswith('D-')) + self.assertEqual(34, len(doc_id1)) + int(doc_id1[len('D-'):], 16) + self.assertNotEqual(doc_id1, db._allocate_doc_id()) + self.delete_db(db._dbname) + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +# ----------------------------------------------------------------------------- + +def make_couch_database_for_test(test, replica_uid): + port = str(test.couch_port) + dbname = ('test-%s' % uuid4().hex) + db = couch.CouchDatabase.open_database( + urljoin('http://localhost:' + port, dbname), + create=True, + replica_uid=replica_uid or 'test', + ensure_ddocs=True) + test.addCleanup(test.delete_db, dbname) + return db + + +def copy_couch_database_for_test(test, db): + port = str(test.couch_port) + couch_url = 'http://localhost:' + port + new_dbname = db._dbname + '_copy' + new_db = couch.CouchDatabase.open_database( + urljoin(couch_url, new_dbname), + create=True, + replica_uid=db._replica_uid or 'test') + # copy all docs + session = couch.Session() + old_couch_db = Server(couch_url, session=session)[db._dbname] + new_couch_db = Server(couch_url, session=session)[new_dbname] + for doc_id in old_couch_db: + doc = old_couch_db.get(doc_id) + # bypass u1db_config document + if doc_id == 'u1db_config': + pass + # copy design docs + elif doc_id.startswith('_design'): + del doc['_rev'] + new_couch_db.save(doc) + # copy u1db docs + elif 'u1db_rev' in doc: + new_doc = { + '_id': doc['_id'], + 'u1db_transactions': doc['u1db_transactions'], + 'u1db_rev': doc['u1db_rev'] + } + attachments = [] + if ('u1db_conflicts' in doc): + new_doc['u1db_conflicts'] = doc['u1db_conflicts'] + for c_rev in doc['u1db_conflicts']: + attachments.append('u1db_conflict_%s' % c_rev) + new_couch_db.save(new_doc) + # save conflict data + attachments.append('u1db_content') + for att_name in attachments: + att = old_couch_db.get_attachment(doc_id, att_name) + if (att is not None): + new_couch_db.put_attachment(new_doc, att, + filename=att_name) + # cleanup connections to prevent file descriptor leaking + return new_db + + +def make_document_for_test(test, doc_id, rev, content, has_conflicts=False): + return ServerDocument( + doc_id, rev, content, has_conflicts=has_conflicts) + + +COUCH_SCENARIOS = [ + ('couch', {'make_database_for_test': make_couch_database_for_test, + 'copy_database_for_test': copy_couch_database_for_test, + 'make_document_for_test': make_document_for_test, }), +] + + +class CouchTests( + TestWithScenarios, test_backends.AllDatabaseTests, CouchDBTestCase): + + scenarios = COUCH_SCENARIOS + + +class SoledadBackendTests( + TestWithScenarios, + test_backends.LocalDatabaseTests, + CouchDBTestCase): + + scenarios = COUCH_SCENARIOS + + +class CouchValidateGenNTransIdTests( + TestWithScenarios, + test_backends.LocalDatabaseValidateGenNTransIdTests, + CouchDBTestCase): + + scenarios = COUCH_SCENARIOS + + +class CouchValidateSourceGenTests( + TestWithScenarios, + test_backends.LocalDatabaseValidateSourceGenTests, + CouchDBTestCase): + + scenarios = COUCH_SCENARIOS + + +class CouchWithConflictsTests( + TestWithScenarios, + test_backends.LocalDatabaseWithConflictsTests, + CouchDBTestCase): + + scenarios = COUCH_SCENARIOS + + +# Notice: the CouchDB backend does not have indexing capabilities, so we do +# not test indexing now. + +# class CouchIndexTests(test_backends.DatabaseIndexTests, CouchDBTestCase): +# +# scenarios = COUCH_SCENARIOS +# +# def tearDown(self): +# self.db.delete_database() +# test_backends.DatabaseIndexTests.tearDown(self) + + +# ----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_sync`. +# ----------------------------------------------------------------------------- + +target_scenarios = [ + ('local', {'create_db_and_target': make_local_db_and_target}), ] + + +simple_doc = tests.simple_doc +nested_doc = tests.nested_doc + + +class SoledadBackendSyncTargetTests( + TestWithScenarios, + DatabaseBaseTests, + CouchDBTestCase): + + # TODO: implement _set_trace_hook(_shallow) in CouchSyncTarget so + # skipped tests can be succesfully executed. + + # whitebox true means self.db is the actual local db object + # against which the sync is performed + whitebox = True + + scenarios = (tests.multiply_scenarios(COUCH_SCENARIOS, target_scenarios)) + + def set_trace_hook(self, callback, shallow=False): + setter = (self.st._set_trace_hook if not shallow else + self.st._set_trace_hook_shallow) + try: + setter(callback) + except NotImplementedError: + self.skipTest("%s does not implement _set_trace_hook" + % (self.st.__class__.__name__,)) + + def setUp(self): + CouchDBTestCase.setUp(self) + # other stuff + self.db, self.st = self.create_db_and_target(self) + self.other_changes = [] + + def tearDown(self): + self.db.close() + CouchDBTestCase.tearDown(self) + + def receive_doc(self, doc, gen, trans_id): + self.other_changes.append( + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) + + def test_sync_exchange_returns_many_new_docs(self): + # This test was replicated to allow dictionaries to be compared after + # JSON expansion (because one dictionary may have many different + # serialized representations). + doc = self.db.create_doc_from_json(simple_doc) + doc2 = self.db.create_doc_from_json(nested_doc) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + new_gen, _ = self.st.sync_exchange( + [], 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + self.assertEqual(2, new_gen) + self.assertEqual( + [(doc.doc_id, doc.rev, json.loads(simple_doc), 1), + (doc2.doc_id, doc2.rev, json.loads(nested_doc), 2)], + [c[:-3] + (json.loads(c[-3]), c[-2]) for c in self.other_changes]) + if self.whitebox: + self.assertEqual( + self.db._last_exchange_log['return'], + {'last_gen': 2, 'docs': + [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) + + def test_get_sync_target(self): + self.assertIsNot(None, self.st) + + def test_get_sync_info(self): + self.assertEqual( + ('test', 0, '', 0, ''), self.st.get_sync_info('other')) + + def test_create_doc_updates_sync_info(self): + self.assertEqual( + ('test', 0, '', 0, ''), self.st.get_sync_info('other')) + self.db.create_doc_from_json(simple_doc) + self.assertEqual(1, self.st.get_sync_info('other')[1]) + + def test_record_sync_info(self): + self.st.record_sync_info('replica', 10, 'T-transid') + self.assertEqual( + ('test', 0, '', 10, 'T-transid'), self.st.get_sync_info('replica')) + + def test_sync_exchange(self): + docs_by_gen = [ + (self.make_document('doc-id', 'replica:1', simple_doc), 10, + 'T-sid')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False) + self.assertTransactionLog(['doc-id'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 1, last_trans_id), + (self.other_changes, new_gen, last_trans_id)) + self.assertEqual(10, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_deleted(self): + doc = self.db.create_doc_from_json('{}') + edit_rev = 'replica:1|' + doc.rev + docs_by_gen = [ + (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertGetDocIncludeDeleted( + self.db, doc.doc_id, edit_rev, None, False) + self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 2, last_trans_id), + (self.other_changes, new_gen, trans_id)) + self.assertEqual(10, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_push_many(self): + docs_by_gen = [ + (self.make_document('doc-id', 'replica:1', simple_doc), 10, 'T-1'), + (self.make_document('doc-id2', 'replica:1', nested_doc), 11, + 'T-2')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertGetDoc(self.db, 'doc-id', 'replica:1', simple_doc, False) + self.assertGetDoc(self.db, 'doc-id2', 'replica:1', nested_doc, False) + self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 2, last_trans_id), + (self.other_changes, new_gen, trans_id)) + self.assertEqual(11, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_refuses_conflicts(self): + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + new_doc = '{"key": "altval"}' + docs_by_gen = [ + (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, + 'T-sid')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id], self.db) + self.assertEqual( + (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1]) + self.assertEqual(1, new_gen) + if self.whitebox: + self.assertEqual(self.db._last_exchange_log['return'], + {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) + + def test_sync_exchange_ignores_convergence(self): + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + gen, txid = self.db._get_generation_info() + docs_by_gen = [ + (self.make_document(doc.doc_id, doc.rev, simple_doc), 10, 'T-sid')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=gen, + last_known_trans_id=txid, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id], self.db) + self.assertEqual(([], 1), (self.other_changes, new_gen)) + + def test_sync_exchange_returns_new_docs(self): + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + new_gen, _ = self.st.sync_exchange( + [], 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id], self.db) + self.assertEqual( + (doc.doc_id, doc.rev, simple_doc, 1), self.other_changes[0][:-1]) + self.assertEqual(1, new_gen) + if self.whitebox: + self.assertEqual(self.db._last_exchange_log['return'], + {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]}) + + def test_sync_exchange_returns_deleted_docs(self): + doc = self.db.create_doc_from_json(simple_doc) + self.db.delete_doc(doc) + self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) + new_gen, _ = self.st.sync_exchange( + [], 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) + self.assertEqual( + (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) + self.assertEqual(2, new_gen) + if self.whitebox: + self.assertEqual(self.db._last_exchange_log['return'], + {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) + + def test_sync_exchange_getting_newer_docs(self): + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + new_doc = '{"key": "altval"}' + docs_by_gen = [ + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + 'T-sid')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) + self.assertEqual(([], 2), (self.other_changes, new_gen)) + + def test_sync_exchange_with_concurrent_updates_of_synced_doc(self): + expected = [] + + def before_whatschanged_cb(state): + if state != 'before whats_changed': + return + cont = '{"key": "cuncurrent"}' + conc_rev = self.db.put_doc( + self.make_document(doc.doc_id, 'test:1|z:2', cont)) + expected.append((doc.doc_id, conc_rev, cont, 3)) + + self.set_trace_hook(before_whatschanged_cb) + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + new_doc = '{"key": "altval"}' + docs_by_gen = [ + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + 'T-sid')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertEqual(expected, [c[:-1] for c in self.other_changes]) + self.assertEqual(3, new_gen) + + def test_sync_exchange_with_concurrent_updates(self): + + def after_whatschanged_cb(state): + if state != 'after whats_changed': + return + self.db.create_doc_from_json('{"new": "doc"}') + + self.set_trace_hook(after_whatschanged_cb) + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + new_doc = '{"key": "altval"}' + docs_by_gen = [ + (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + 'T-sid')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertEqual(([], 2), (self.other_changes, new_gen)) + + def test_sync_exchange_converged_handling(self): + doc = self.db.create_doc_from_json(simple_doc) + docs_by_gen = [ + (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), + (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, + 'T-bar')] + new_gen, _ = self.st.sync_exchange( + docs_by_gen, 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc) + self.assertEqual(([], 2), (self.other_changes, new_gen)) + + def test_sync_exchange_detect_incomplete_exchange(self): + def before_get_docs_explode(state): + if state != 'before get_docs': + return + raise u1db_errors.U1DBError("fail") + self.set_trace_hook(before_get_docs_explode) + # suppress traceback printing in the wsgiref server + # self.patch(simple_server.ServerHandler, + # 'log_exception', lambda h, exc_info: None) + doc = self.db.create_doc_from_json(simple_doc) + self.assertTransactionLog([doc.doc_id], self.db) + self.assertRaises( + (u1db_errors.U1DBError, u1db_errors.BrokenSyncStream), + self.st.sync_exchange, [], 'other-replica', + last_known_generation=0, last_known_trans_id=None, + return_doc_cb=self.receive_doc) + + def test_sync_exchange_doc_ids(self): + sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None) + if sync_exchange_doc_ids is None: + self.skipTest("sync_exchange_doc_ids not implemented") + db2 = self.create_database('test2') + doc = db2.create_doc_from_json(simple_doc) + new_gen, trans_id = sync_exchange_doc_ids( + db2, [(doc.doc_id, 10, 'T-sid')], 0, None, + return_doc_cb=self.receive_doc) + self.assertGetDoc(self.db, doc.doc_id, doc.rev, simple_doc, False) + self.assertTransactionLog([doc.doc_id], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 1, last_trans_id), + (self.other_changes, new_gen, trans_id)) + self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3]) + + def test__set_trace_hook(self): + called = [] + + def cb(state): + called.append(state) + + self.set_trace_hook(cb) + self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) + self.st.record_sync_info('replica', 0, 'T-sid') + self.assertEqual(['before whats_changed', + 'after whats_changed', + 'before get_docs', + 'record_sync_info', + ], + called) + + def test__set_trace_hook_shallow(self): + st_trace_shallow = self.st._set_trace_hook_shallow + target_st_trace_shallow = SyncTarget._set_trace_hook_shallow + same_meth = st_trace_shallow == self.st._set_trace_hook + same_fun = st_trace_shallow.im_func == target_st_trace_shallow.im_func + if (same_meth or same_fun): + # shallow same as full + expected = ['before whats_changed', + 'after whats_changed', + 'before get_docs', + 'record_sync_info', + ] + else: + expected = ['sync_exchange', 'record_sync_info'] + + called = [] + + def cb(state): + called.append(state) + + self.set_trace_hook(cb, shallow=True) + self.st.sync_exchange([], 'replica', 0, None, self.receive_doc) + self.st.record_sync_info('replica', 0, 'T-sid') + self.assertEqual(expected, called) + +sync_scenarios = [] +for name, scenario in COUCH_SCENARIOS: + scenario = dict(scenario) + scenario['do_sync'] = sync_via_synchronizer + sync_scenarios.append((name, scenario)) + scenario = dict(scenario) + + +class SoledadBackendSyncTests( + TestWithScenarios, + DatabaseBaseTests, + CouchDBTestCase): + + scenarios = sync_scenarios + + def create_database(self, replica_uid, sync_role=None): + if replica_uid == 'test' and sync_role is None: + # created up the chain by base class but unused + return None + db = self.create_database_for_role(replica_uid, sync_role) + if sync_role: + self._use_tracking[db] = (replica_uid, sync_role) + return db + + def create_database_for_role(self, replica_uid, sync_role): + # hook point for reuse + return DatabaseBaseTests.create_database(self, replica_uid) + + def copy_database(self, db, sync_role=None): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES + # IS THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST + # THAT WE CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS + # RATHER THAN CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND + # NINJA TO YOUR HOUSE. + db_copy = self.copy_database_for_test(self, db) + name, orig_sync_role = self._use_tracking[db] + self._use_tracking[db_copy] = ( + name + '(copy)', sync_role or orig_sync_role) + return db_copy + + def sync(self, db_from, db_to, trace_hook=None, + trace_hook_shallow=None): + from_name, from_sync_role = self._use_tracking[db_from] + to_name, to_sync_role = self._use_tracking[db_to] + if from_sync_role not in ('source', 'both'): + raise Exception("%s marked for %s use but used as source" % + (from_name, from_sync_role)) + if to_sync_role not in ('target', 'both'): + raise Exception("%s marked for %s use but used as target" % + (to_name, to_sync_role)) + return self.do_sync(self, db_from, db_to, trace_hook, + trace_hook_shallow) + + def setUp(self): + self.db = None + self.db1 = None + self.db2 = None + self.db3 = None + self.db1_copy = None + self.db2_copy = None + self._use_tracking = {} + DatabaseBaseTests.setUp(self) + + def tearDown(self): + for db in [ + self.db, self.db1, self.db2, + self.db3, self.db1_copy, self.db2_copy + ]: + if db is not None: + self.delete_db(db._dbname) + db.close() + DatabaseBaseTests.tearDown(self) + + def assertLastExchangeLog(self, db, expected): + log = getattr(db, '_last_exchange_log', None) + if log is None: + return + self.assertEqual(expected, log) + + def test_sync_tracks_db_generation_of_other(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.assertEqual(0, self.sync(self.db1, self.db2)) + self.assertEqual( + (0, ''), self.db1._get_replica_gen_and_trans_id('test2')) + self.assertEqual( + (0, ''), self.db2._get_replica_gen_and_trans_id('test1')) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [], 'last_known_gen': 0}, + 'return': {'docs': [], 'last_gen': 0}}) + + def test_sync_autoresolves(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc1 = self.db1.create_doc_from_json(simple_doc, doc_id='doc') + rev1 = doc1.rev + doc2 = self.db2.create_doc_from_json(simple_doc, doc_id='doc') + rev2 = doc2.rev + self.sync(self.db1, self.db2) + doc = self.db1.get_doc('doc') + self.assertFalse(doc.has_conflicts) + self.assertEqual(doc.rev, self.db2.get_doc('doc').rev) + v = vectorclock.VectorClockRev(doc.rev) + self.assertTrue(v.is_newer(vectorclock.VectorClockRev(rev1))) + self.assertTrue(v.is_newer(vectorclock.VectorClockRev(rev2))) + + def test_sync_autoresolves_moar(self): + # here we test that when a database that has a conflicted document is + # the source of a sync, and the target database has a revision of the + # conflicted document that is newer than the source database's, and + # that target's database's document's content is the same as the + # source's document's conflict's, the source's document's conflict gets + # autoresolved, and the source's document's revision bumped. + # + # idea is as follows: + # A B + # a1 - + # `-------> + # a1 a1 + # v v + # a2 a1b1 + # `-------> + # a1b1+a2 a1b1 + # v + # a1b1+a2 a1b2 (a1b2 has same content as a2) + # `-------> + # a3b2 a1b2 (autoresolved) + # `-------> + # a3b2 a3b2 + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(simple_doc, doc_id='doc') + self.sync(self.db1, self.db2) + for db, content in [(self.db1, '{}'), (self.db2, '{"hi": 42}')]: + doc = db.get_doc('doc') + doc.set_json(content) + db.put_doc(doc) + self.sync(self.db1, self.db2) + # db1 and db2 now both have a doc of {hi:42}, but db1 has a conflict + doc = self.db1.get_doc('doc') + rev1 = doc.rev + self.assertTrue(doc.has_conflicts) + # set db2 to have a doc of {} (same as db1 before the conflict) + doc = self.db2.get_doc('doc') + doc.set_json('{}') + self.db2.put_doc(doc) + rev2 = doc.rev + # sync it across + self.sync(self.db1, self.db2) + # tadaa! + doc = self.db1.get_doc('doc') + self.assertFalse(doc.has_conflicts) + vec1 = vectorclock.VectorClockRev(rev1) + vec2 = vectorclock.VectorClockRev(rev2) + vec3 = vectorclock.VectorClockRev(doc.rev) + self.assertTrue(vec3.is_newer(vec1)) + self.assertTrue(vec3.is_newer(vec2)) + # because the conflict is on the source, sync it another time + self.sync(self.db1, self.db2) + # make sure db2 now has the exact same thing + self.assertEqual(self.db1.get_doc('doc'), self.db2.get_doc('doc')) + + def test_sync_autoresolves_moar_backwards(self): + # here we test that when a database that has a conflicted document is + # the target of a sync, and the source database has a revision of the + # conflicted document that is newer than the target database's, and + # that source's database's document's content is the same as the + # target's document's conflict's, the target's document's conflict gets + # autoresolved, and the document's revision bumped. + # + # idea is as follows: + # A B + # a1 - + # `-------> + # a1 a1 + # v v + # a2 a1b1 + # `-------> + # a1b1+a2 a1b1 + # v + # a1b1+a2 a1b2 (a1b2 has same content as a2) + # <-------' + # a3b2 a3b2 (autoresolved and propagated) + self.db1 = self.create_database('test1', 'both') + self.db2 = self.create_database('test2', 'both') + self.db1.create_doc_from_json(simple_doc, doc_id='doc') + self.sync(self.db1, self.db2) + for db, content in [(self.db1, '{}'), (self.db2, '{"hi": 42}')]: + doc = db.get_doc('doc') + doc.set_json(content) + db.put_doc(doc) + self.sync(self.db1, self.db2) + # db1 and db2 now both have a doc of {hi:42}, but db1 has a conflict + doc = self.db1.get_doc('doc') + rev1 = doc.rev + self.assertTrue(doc.has_conflicts) + revc = self.db1.get_doc_conflicts('doc')[-1].rev + # set db2 to have a doc of {} (same as db1 before the conflict) + doc = self.db2.get_doc('doc') + doc.set_json('{}') + self.db2.put_doc(doc) + rev2 = doc.rev + # sync it across + self.sync(self.db2, self.db1) + # tadaa! + doc = self.db1.get_doc('doc') + self.assertFalse(doc.has_conflicts) + vec1 = vectorclock.VectorClockRev(rev1) + vec2 = vectorclock.VectorClockRev(rev2) + vec3 = vectorclock.VectorClockRev(doc.rev) + vecc = vectorclock.VectorClockRev(revc) + self.assertTrue(vec3.is_newer(vec1)) + self.assertTrue(vec3.is_newer(vec2)) + self.assertTrue(vec3.is_newer(vecc)) + # make sure db2 now has the exact same thing + self.assertEqual(self.db1.get_doc('doc'), self.db2.get_doc('doc')) + + def test_sync_autoresolves_moar_backwards_three(self): + # same as autoresolves_moar_backwards, but with three databases (note + # all the syncs go in the same direction -- this is a more natural + # scenario): + # + # A B C + # a1 - - + # `-------> + # a1 a1 - + # `-------> + # a1 a1 a1 + # v v + # a2 a1b1 a1 + # `-------------------> + # a2 a1b1 a2 + # `-------> + # a2+a1b1 a2 + # v + # a2 a2+a1b1 a2c1 (same as a1b1) + # `-------------------> + # a2c1 a2+a1b1 a2c1 + # `-------> + # a2b2c1 a2b2c1 a2c1 + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'both') + self.db3 = self.create_database('test3', 'target') + self.db1.create_doc_from_json(simple_doc, doc_id='doc') + self.sync(self.db1, self.db2) + self.sync(self.db2, self.db3) + for db, content in [(self.db2, '{"hi": 42}'), + (self.db1, '{}'), + ]: + doc = db.get_doc('doc') + doc.set_json(content) + db.put_doc(doc) + self.sync(self.db1, self.db3) + self.sync(self.db2, self.db3) + # db2 and db3 now both have a doc of {}, but db2 has a + # conflict + doc = self.db2.get_doc('doc') + self.assertTrue(doc.has_conflicts) + revc = self.db2.get_doc_conflicts('doc')[-1].rev + self.assertEqual('{}', doc.get_json()) + self.assertEqual(self.db3.get_doc('doc').get_json(), doc.get_json()) + self.assertEqual(self.db3.get_doc('doc').rev, doc.rev) + # set db3 to have a doc of {hi:42} (same as db2 before the conflict) + doc = self.db3.get_doc('doc') + doc.set_json('{"hi": 42}') + self.db3.put_doc(doc) + rev3 = doc.rev + # sync it across to db1 + self.sync(self.db1, self.db3) + # db1 now has hi:42, with a rev that is newer than db2's doc + doc = self.db1.get_doc('doc') + rev1 = doc.rev + self.assertFalse(doc.has_conflicts) + self.assertEqual('{"hi": 42}', doc.get_json()) + VCR = vectorclock.VectorClockRev + self.assertTrue(VCR(rev1).is_newer(VCR(self.db2.get_doc('doc').rev))) + # so sync it to db2 + self.sync(self.db1, self.db2) + # tadaa! + doc = self.db2.get_doc('doc') + self.assertFalse(doc.has_conflicts) + # db2's revision of the document is strictly newer than db1's before + # the sync, and db3's before that sync way back when + self.assertTrue(VCR(doc.rev).is_newer(VCR(rev1))) + self.assertTrue(VCR(doc.rev).is_newer(VCR(rev3))) + self.assertTrue(VCR(doc.rev).is_newer(VCR(revc))) + # make sure both dbs now have the exact same thing + self.assertEqual(self.db1.get_doc('doc'), self.db2.get_doc('doc')) + + def test_sync_puts_changes(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc = self.db1.create_doc_from_json(simple_doc) + self.assertEqual(1, self.sync(self.db1, self.db2)) + self.assertGetDoc(self.db2, doc.doc_id, doc.rev, simple_doc, False) + self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0]) + self.assertEqual(1, self.db2._get_replica_gen_and_trans_id('test1')[0]) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [(doc.doc_id, doc.rev)], + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, + 'return': {'docs': [], 'last_gen': 1}}) + + def test_sync_pulls_changes(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc = self.db2.create_doc_from_json(simple_doc) + self.assertEqual(0, self.sync(self.db1, self.db2)) + self.assertGetDoc(self.db1, doc.doc_id, doc.rev, simple_doc, False) + self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0]) + self.assertEqual(1, self.db2._get_replica_gen_and_trans_id('test1')[0]) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [], 'last_known_gen': 0}, + 'return': {'docs': [(doc.doc_id, doc.rev)], + 'last_gen': 1}}) + self.assertGetDoc(self.db2, doc.doc_id, doc.rev, simple_doc, False) + + def test_sync_pulling_doesnt_update_other_if_changed(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc = self.db2.create_doc_from_json(simple_doc) + # After the local side has sent its list of docs, before we start + # receiving the "targets" response, we update the local database with a + # new record. + # When we finish synchronizing, we can notice that something locally + # was updated, and we cannot tell c2 our new updated generation + + def before_get_docs(state): + if state != 'before get_docs': + return + self.db1.create_doc_from_json(simple_doc) + + self.assertEqual(0, self.sync(self.db1, self.db2, + trace_hook=before_get_docs)) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [], 'last_known_gen': 0}, + 'return': {'docs': [(doc.doc_id, doc.rev)], + 'last_gen': 1}}) + self.assertEqual(1, self.db1._get_replica_gen_and_trans_id('test2')[0]) + # c2 should not have gotten a '_record_sync_info' call, because the + # local database had been updated more than just by the messages + # returned from c2. + self.assertEqual( + (0, ''), self.db2._get_replica_gen_and_trans_id('test1')) + + def test_sync_doesnt_update_other_if_nothing_pulled(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(simple_doc) + + def no_record_sync_info(state): + if state != 'record_sync_info': + return + self.fail('SyncTarget.record_sync_info was called') + self.assertEqual(1, self.sync(self.db1, self.db2, + trace_hook_shallow=no_record_sync_info)) + self.assertEqual( + 1, + self.db2._get_replica_gen_and_trans_id(self.db1._replica_uid)[0]) + + def test_sync_ignores_convergence(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'both') + doc = self.db1.create_doc_from_json(simple_doc) + self.db3 = self.create_database('test3', 'target') + self.assertEqual(1, self.sync(self.db1, self.db3)) + self.assertEqual(0, self.sync(self.db2, self.db3)) + self.assertEqual(1, self.sync(self.db1, self.db2)) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [(doc.doc_id, doc.rev)], + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, + 'return': {'docs': [], 'last_gen': 1}}) + + def test_sync_ignores_superseded(self): + self.db1 = self.create_database('test1', 'both') + self.db2 = self.create_database('test2', 'both') + doc = self.db1.create_doc_from_json(simple_doc) + doc_rev1 = doc.rev + self.db3 = self.create_database('test3', 'target') + self.sync(self.db1, self.db3) + self.sync(self.db2, self.db3) + new_content = '{"key": "altval"}' + doc.set_json(new_content) + self.db1.put_doc(doc) + doc_rev2 = doc.rev + self.sync(self.db2, self.db1) + self.assertLastExchangeLog( + self.db1, + {'receive': {'docs': [(doc.doc_id, doc_rev1)], + 'source_uid': 'test2', + 'source_gen': 1, 'last_known_gen': 0}, + 'return': {'docs': [(doc.doc_id, doc_rev2)], + 'last_gen': 2}}) + self.assertGetDoc(self.db1, doc.doc_id, doc_rev2, new_content, False) + + def test_sync_sees_remote_conflicted(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc1 = self.db1.create_doc_from_json(simple_doc) + doc_id = doc1.doc_id + doc1_rev = doc1.rev + new_doc = '{"key": "altval"}' + doc2 = self.db2.create_doc_from_json(new_doc, doc_id=doc_id) + doc2_rev = doc2.rev + self.assertTransactionLog([doc1.doc_id], self.db1) + self.sync(self.db1, self.db2) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [(doc_id, doc1_rev)], + 'source_uid': 'test1', + 'source_gen': 1, 'last_known_gen': 0}, + 'return': {'docs': [(doc_id, doc2_rev)], + 'last_gen': 1}}) + self.assertTransactionLog([doc_id, doc_id], self.db1) + self.assertGetDoc(self.db1, doc_id, doc2_rev, new_doc, True) + self.assertGetDoc(self.db2, doc_id, doc2_rev, new_doc, False) + + def test_sync_sees_remote_delete_conflicted(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc1 = self.db1.create_doc_from_json(simple_doc) + doc_id = doc1.doc_id + self.sync(self.db1, self.db2) + doc2 = self.make_document(doc1.doc_id, doc1.rev, doc1.get_json()) + new_doc = '{"key": "altval"}' + doc1.set_json(new_doc) + self.db1.put_doc(doc1) + self.db2.delete_doc(doc2) + self.assertTransactionLog([doc_id, doc_id], self.db1) + self.sync(self.db1, self.db2) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [(doc_id, doc1.rev)], + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, + 'return': {'docs': [(doc_id, doc2.rev)], + 'last_gen': 2}}) + self.assertTransactionLog([doc_id, doc_id, doc_id], self.db1) + self.assertGetDocIncludeDeleted(self.db1, doc_id, doc2.rev, None, True) + self.assertGetDocIncludeDeleted( + self.db2, doc_id, doc2.rev, None, False) + + def test_sync_local_race_conflicted(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + doc = self.db1.create_doc_from_json(simple_doc) + doc_id = doc.doc_id + doc1_rev = doc.rev + self.sync(self.db1, self.db2) + content1 = '{"key": "localval"}' + content2 = '{"key": "altval"}' + doc.set_json(content2) + self.db2.put_doc(doc) + doc2_rev2 = doc.rev + triggered = [] + + def after_whatschanged(state): + if state != 'after whats_changed': + return + triggered.append(True) + doc = self.make_document(doc_id, doc1_rev, content1) + self.db1.put_doc(doc) + + self.sync(self.db1, self.db2, trace_hook=after_whatschanged) + self.assertEqual([True], triggered) + self.assertGetDoc(self.db1, doc_id, doc2_rev2, content2, True) + + def test_sync_propagates_deletes(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'both') + doc1 = self.db1.create_doc_from_json(simple_doc) + doc_id = doc1.doc_id + self.sync(self.db1, self.db2) + self.db3 = self.create_database('test3', 'target') + self.sync(self.db1, self.db3) + self.db1.delete_doc(doc1) + deleted_rev = doc1.rev + self.sync(self.db1, self.db2) + self.assertLastExchangeLog( + self.db2, + {'receive': {'docs': [(doc_id, deleted_rev)], + 'source_uid': 'test1', + 'source_gen': 2, 'last_known_gen': 1}, + 'return': {'docs': [], 'last_gen': 2}}) + self.assertGetDocIncludeDeleted( + self.db1, doc_id, deleted_rev, None, False) + self.assertGetDocIncludeDeleted( + self.db2, doc_id, deleted_rev, None, False) + self.sync(self.db2, self.db3) + self.assertLastExchangeLog( + self.db3, + {'receive': {'docs': [(doc_id, deleted_rev)], + 'source_uid': 'test2', + 'source_gen': 2, 'last_known_gen': 0}, + 'return': {'docs': [], 'last_gen': 2}}) + self.assertGetDocIncludeDeleted( + self.db3, doc_id, deleted_rev, None, False) + + def test_sync_propagates_deletes_2(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json('{"a": "1"}', doc_id='the-doc') + self.sync(self.db1, self.db2) + doc1_2 = self.db2.get_doc('the-doc') + self.db2.delete_doc(doc1_2) + self.sync(self.db1, self.db2) + self.assertGetDocIncludeDeleted( + self.db1, 'the-doc', doc1_2.rev, None, False) + + def test_sync_propagates_resolution(self): + self.db1 = self.create_database('test1', 'both') + self.db2 = self.create_database('test2', 'both') + doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc') + self.db3 = self.create_database('test3', 'both') + self.sync(self.db2, self.db1) + self.assertEqual( + self.db1._get_generation_info(), + self.db2._get_replica_gen_and_trans_id(self.db1._replica_uid)) + self.assertEqual( + self.db2._get_generation_info(), + self.db1._get_replica_gen_and_trans_id(self.db2._replica_uid)) + self.sync(self.db3, self.db1) + # update on 2 + doc2 = self.make_document('the-doc', doc1.rev, '{"a": 2}') + self.db2.put_doc(doc2) + self.sync(self.db2, self.db3) + self.assertEqual(self.db3.get_doc('the-doc').rev, doc2.rev) + # update on 1 + doc1.set_json('{"a": 3}') + self.db1.put_doc(doc1) + # conflicts + self.sync(self.db2, self.db1) + self.sync(self.db3, self.db1) + self.assertTrue(self.db2.get_doc('the-doc').has_conflicts) + self.assertTrue(self.db3.get_doc('the-doc').has_conflicts) + # resolve + conflicts = self.db2.get_doc_conflicts('the-doc') + doc4 = self.make_document('the-doc', None, '{"a": 4}') + revs = [doc.rev for doc in conflicts] + self.db2.resolve_doc(doc4, revs) + doc2 = self.db2.get_doc('the-doc') + self.assertEqual(doc4.get_json(), doc2.get_json()) + self.assertFalse(doc2.has_conflicts) + self.sync(self.db2, self.db3) + doc3 = self.db3.get_doc('the-doc') + self.assertEqual(doc4.get_json(), doc3.get_json()) + self.assertFalse(doc3.has_conflicts) + + def test_sync_supersedes_conflicts(self): + self.db1 = self.create_database('test1', 'both') + self.db2 = self.create_database('test2', 'target') + self.db3 = self.create_database('test3', 'both') + doc1 = self.db1.create_doc_from_json('{"a": 1}', doc_id='the-doc') + self.db2.create_doc_from_json('{"b": 1}', doc_id='the-doc') + self.db3.create_doc_from_json('{"c": 1}', doc_id='the-doc') + self.sync(self.db3, self.db1) + self.assertEqual( + self.db1._get_generation_info(), + self.db3._get_replica_gen_and_trans_id(self.db1._replica_uid)) + self.assertEqual( + self.db3._get_generation_info(), + self.db1._get_replica_gen_and_trans_id(self.db3._replica_uid)) + self.sync(self.db3, self.db2) + self.assertEqual( + self.db2._get_generation_info(), + self.db3._get_replica_gen_and_trans_id(self.db2._replica_uid)) + self.assertEqual( + self.db3._get_generation_info(), + self.db2._get_replica_gen_and_trans_id(self.db3._replica_uid)) + self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc'))) + doc1.set_json('{"a": 2}') + self.db1.put_doc(doc1) + self.sync(self.db3, self.db1) + # original doc1 should have been removed from conflicts + self.assertEqual(3, len(self.db3.get_doc_conflicts('the-doc'))) + + def test_sync_stops_after_get_sync_info(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(tests.simple_doc) + self.sync(self.db1, self.db2) + + def put_hook(state): + self.fail("Tracehook triggered for %s" % (state,)) + + self.sync(self.db1, self.db2, trace_hook_shallow=put_hook) + + def test_sync_detects_identical_replica_uid(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test1', 'target') + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') + self.assertRaises( + u1db_errors.InvalidReplicaUID, self.sync, self.db1, self.db2) + + def test_sync_detects_rollback_in_source(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') + self.sync(self.db1, self.db2) + self.db1_copy = self.copy_database(self.db1) + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.sync(self.db1, self.db2) + self.assertRaises( + u1db_errors.InvalidGeneration, self.sync, self.db1_copy, self.db2) + + def test_sync_detects_rollback_in_target(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.sync(self.db1, self.db2) + self.db2_copy = self.copy_database(self.db2) + self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.sync(self.db1, self.db2) + self.assertRaises( + u1db_errors.InvalidGeneration, self.sync, self.db1, self.db2_copy) + + def test_sync_detects_diverged_source(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db3 = self.copy_database(self.db1) + self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.db3.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.sync(self.db1, self.db2) + self.assertRaises( + u1db_errors.InvalidTransactionId, self.sync, self.db3, self.db2) + + def test_sync_detects_diverged_target(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db3 = self.copy_database(self.db2) + self.db3.create_doc_from_json(tests.nested_doc, doc_id="divergent") + self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.sync(self.db1, self.db2) + self.assertRaises( + u1db_errors.InvalidTransactionId, self.sync, self.db1, self.db3) + + def test_sync_detects_rollback_and_divergence_in_source(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc1') + self.sync(self.db1, self.db2) + self.db1_copy = self.copy_database(self.db1) + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db1.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.sync(self.db1, self.db2) + self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db1_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.assertRaises( + u1db_errors.InvalidTransactionId, self.sync, + self.db1_copy, self.db2) + + def test_sync_detects_rollback_and_divergence_in_target(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + self.db1.create_doc_from_json(tests.simple_doc, doc_id="divergent") + self.sync(self.db1, self.db2) + self.db2_copy = self.copy_database(self.db2) + self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db2.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.sync(self.db1, self.db2) + self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc2') + self.db2_copy.create_doc_from_json(tests.simple_doc, doc_id='doc3') + self.assertRaises( + u1db_errors.InvalidTransactionId, self.sync, + self.db1, self.db2_copy) + + def test_optional_sync_preserve_json(self): + self.db1 = self.create_database('test1', 'source') + self.db2 = self.create_database('test2', 'target') + cont1 = '{"a": 2}' + cont2 = '{"b": 3}' + self.db1.create_doc_from_json(cont1, doc_id="1") + self.db2.create_doc_from_json(cont2, doc_id="2") + self.sync(self.db1, self.db2) + self.assertEqual(cont1, self.db2.get_doc("1").get_json()) + self.assertEqual(cont2, self.db1.get_doc("2").get_json()) + + +class SoledadBackendExceptionsTests(CouchDBTestCase): + + def setUp(self): + CouchDBTestCase.setUp(self) + + def create_db(self, ensure=True, dbname=None): + if not dbname: + dbname = ('test-%s' % uuid4().hex) + if dbname not in self.couch_server: + self.couch_server.create(dbname) + self.db = couch.CouchDatabase( + ('http://127.0.0.1:%d' % self.couch_port), + dbname, + ensure_ddocs=ensure) + + def tearDown(self): + self.db.delete_database() + self.db.close() + CouchDBTestCase.tearDown(self) + + def test_missing_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + self.create_db(ensure=False) + # get_generation_info() + self.assertRaises( + errors.MissingDesignDocError, + self.db.get_generation_info) + # get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocError, + self.db.get_trans_id_for_gen, 1) + # get_transaction_log() + self.assertRaises( + errors.MissingDesignDocError, + self.db.get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocError, + self.db.whats_changed) + + def test_missing_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.create_db(ensure=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + transactions['lists'] = {} + self.db._database.save(transactions) + # get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.get_generation_info) + # get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_absent_design_doc_functions_raises(self): + """ + Test that all methods that access design documents list functions + will raise if the functions are not present. + """ + self.create_db(ensure=True) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['lists'] + self.db._database.save(transactions) + # get_generation_info() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.get_trans_id_for_gen, 1) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocListFunctionError, + self.db.whats_changed) + + def test_missing_design_doc_named_views_raises(self): + """ + Test that all methods that access design documents' named views will + raise if the views are not present. + """ + self.create_db(ensure=True) + # erase views from _design/docs + docs = self.db._database['_design/docs'] + del docs['views'] + self.db._database.save(docs) + # erase views from _design/syncs + syncs = self.db._database['_design/syncs'] + del syncs['views'] + self.db._database.save(syncs) + # erase views from _design/transactions + transactions = self.db._database['_design/transactions'] + del transactions['views'] + self.db._database.save(transactions) + # get_generation_info() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.get_generation_info) + # _get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.get_trans_id_for_gen, 1) + # _get_transaction_log() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocNamedViewError, + self.db.whats_changed) + + def test_deleted_design_doc_raises(self): + """ + Test that all methods that access design documents will raise if the + design docs are not present. + """ + self.create_db(ensure=True) + # delete _design/docs + del self.db._database['_design/docs'] + # delete _design/syncs + del self.db._database['_design/syncs'] + # delete _design/transactions + del self.db._database['_design/transactions'] + # get_generation_info() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.get_generation_info) + # get_trans_id_for_gen() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.get_trans_id_for_gen, 1) + # get_transaction_log() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.get_transaction_log) + # whats_changed() + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.whats_changed) + + def test_ensure_ddoc_independently(self): + """ + Test that a missing ddocs other than _design/docs will be ensured + even if _design/docs is there. + """ + self.create_db(ensure=True) + del self.db._database['_design/transactions'] + self.assertRaises( + errors.MissingDesignDocDeletedError, + self.db.get_transaction_log) + self.create_db(ensure=True, dbname=self.db._dbname) + self.db.get_transaction_log() + + def test_ensure_security_doc(self): + """ + Ensure_security creates a _security ddoc to ensure that only soledad + will have the lowest privileged access to an user db. + """ + self.create_db(ensure=False) + self.assertFalse(self.db._database.resource.get_json('_security')[2]) + self.db.ensure_security_ddoc() + security_ddoc = self.db._database.resource.get_json('_security')[2] + self.assertIn('admins', security_ddoc) + self.assertFalse(security_ddoc['admins']['names']) + self.assertIn('members', security_ddoc) + self.assertIn('soledad', security_ddoc['members']['names']) + + def test_ensure_security_from_configuration(self): + """ + Given a configuration, follow it to create the security document + """ + self.create_db(ensure=False) + configuration = {'members': ['user1', 'user2'], + 'members_roles': ['role1', 'role2'], + 'admins': ['admin'], + 'admins_roles': ['administrators'] + } + self.db.ensure_security_ddoc(configuration) + + security_ddoc = self.db._database.resource.get_json('_security')[2] + self.assertEquals(configuration['admins'], + security_ddoc['admins']['names']) + self.assertEquals(configuration['admins_roles'], + security_ddoc['admins']['roles']) + self.assertEquals(configuration['members'], + security_ddoc['members']['names']) + self.assertEquals(configuration['members_roles'], + security_ddoc['members']['roles']) + + +class DatabaseNameValidationTest(unittest.TestCase): + + def test_database_name_validation(self): + inject = couch.state.is_db_name_valid("user-deadbeef | cat /secret") + self.assertFalse(inject) + self.assertTrue(couch.state.is_db_name_valid("user-cafe1337")) + + +class CommandBasedDBCreationTest(unittest.TestCase): + + def test_ensure_db_using_custom_command(self): + state = couch.state.CouchServerState("url", create_cmd="echo") + mock_db = Mock() + mock_db.replica_uid = 'replica_uid' + state.open_database = Mock(return_value=mock_db) + db, replica_uid = state.ensure_database("user-1337") # works + self.assertEquals(mock_db, db) + self.assertEquals(mock_db.replica_uid, replica_uid) + + def test_raises_unauthorized_on_failure(self): + state = couch.state.CouchServerState("url", create_cmd="inexistent") + self.assertRaises(u1db_errors.Unauthorized, + state.ensure_database, "user-1337") + + def test_raises_unauthorized_by_default(self): + state = couch.state.CouchServerState("url") + self.assertRaises(u1db_errors.Unauthorized, + state.ensure_database, "user-1337") diff --git a/testing/tests/couch/test_couch_operations_atomicity.py b/testing/tests/couch/test_couch_operations_atomicity.py new file mode 100644 index 00000000..aec9c6cf --- /dev/null +++ b/testing/tests/couch/test_couch_operations_atomicity.py @@ -0,0 +1,371 @@ +# -*- coding: utf-8 -*- +# test_couch_operations_atomicity.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test atomicity of couch operations. +""" +import os +import tempfile +import threading + +from urlparse import urljoin +from twisted.internet import defer +from uuid import uuid4 + +from leap.soledad.client import Soledad +from leap.soledad.common.couch.state import CouchServerState +from leap.soledad.common.couch import CouchDatabase + +from test_soledad.util import ( + make_token_soledad_app, + make_soledad_document_for_test, + soledad_sync_target, +) +from test_soledad.util import CouchDBTestCase +from test_soledad.u1db_tests import TestCaseWithServer + + +REPEAT_TIMES = 20 + + +class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): + + @staticmethod + def make_app_after_state(state): + return make_token_soledad_app(state) + + make_document_for_test = make_soledad_document_for_test + + sync_target = soledad_sync_target + + def _soledad_instance(self, user=None, passphrase=u'123', + prefix='', + secrets_path='secrets.json', + local_db_path='soledad.u1db', server_url='', + cert_file=None, auth_token=None): + """ + Instantiate Soledad. + """ + user = user or self.user + + # this callback ensures we save a document which is sent to the shared + # db. + def _put_doc_side_effect(doc): + self._doc_put = doc + + soledad = Soledad( + user, + passphrase, + secrets_path=os.path.join(self.tempdir, prefix, secrets_path), + local_db_path=os.path.join( + self.tempdir, prefix, local_db_path), + server_url=server_url, + cert_file=cert_file, + auth_token=auth_token, + shared_db=self.get_default_shared_mock(_put_doc_side_effect)) + self.addCleanup(soledad.close) + return soledad + + def make_app(self): + self.request_state = CouchServerState(self.couch_url) + return self.make_app_after_state(self.request_state) + + def setUp(self): + TestCaseWithServer.setUp(self) + CouchDBTestCase.setUp(self) + self.user = ('user-%s' % uuid4().hex) + self.db = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + self.user), + create=True, + replica_uid='replica', + ensure_ddocs=True) + self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") + self.startTwistedServer() + + def tearDown(self): + self.db.delete_database() + self.db.close() + CouchDBTestCase.tearDown(self) + TestCaseWithServer.tearDown(self) + + # + # Sequential tests + # + + def test_correct_transaction_log_after_sequential_puts(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts. + """ + doc = self.db.create_doc({'ops': 0}) + docs = [doc.doc_id] + for i in range(0, REPEAT_TIMES): + self.assertEqual( + i + 1, len(self.db._get_transaction_log())) + doc.content['ops'] += 1 + self.db.put_doc(doc) + docs.append(doc.doc_id) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES + 1, len(transaction_log)) + + # assert that all entries in the log belong to the same doc + self.assertEqual(REPEAT_TIMES + 1, len(docs)) + for doc_id in docs: + self.assertEqual( + REPEAT_TIMES + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_sequential_deletes(self): + """ + Assert that the transaction_log increases accordingly with sequential + puts and deletes. + """ + docs = [] + for i in range(0, REPEAT_TIMES): + doc = self.db.create_doc({'ops': 0}) + self.assertEqual( + 2 * i + 1, len(self.db._get_transaction_log())) + docs.append(doc.doc_id) + self.db.delete_doc(doc) + self.assertEqual( + 2 * i + 2, len(self.db._get_transaction_log())) + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2 * REPEAT_TIMES, len(transaction_log)) + + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + @defer.inlineCallbacks + def test_correct_sync_log_after_sequential_syncs(self): + """ + Assert that the sync_log increases accordingly with sequential syncs. + """ + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + source_replica_uid = sol._dbpool.replica_uid + + def _create_docs(): + deferreds = [] + for i in xrange(0, REPEAT_TIMES): + deferreds.append(sol.create_doc({})) + return defer.gatherResults(deferreds) + + def _assert_transaction_and_sync_logs(results, sync_idx): + # assert sizes of transaction and sync logs + self.assertEqual( + sync_idx * REPEAT_TIMES, + len(self.db._get_transaction_log())) + gen, _ = self.db._get_replica_gen_and_trans_id(source_replica_uid) + self.assertEqual(sync_idx * REPEAT_TIMES, gen) + + def _assert_sync(results, sync_idx): + gen, docs = results + self.assertEqual((sync_idx + 1) * REPEAT_TIMES, gen) + self.assertEqual((sync_idx + 1) * REPEAT_TIMES, len(docs)) + # assert sizes of transaction and sync logs + self.assertEqual((sync_idx + 1) * REPEAT_TIMES, + len(self.db._get_transaction_log())) + target_known_gen, target_known_trans_id = \ + self.db._get_replica_gen_and_trans_id(source_replica_uid) + # assert it has the correct gen and trans_id + conn_key = sol._dbpool._u1dbconnections.keys().pop() + conn = sol._dbpool._u1dbconnections[conn_key] + sol_gen, sol_trans_id = conn._get_generation_info() + self.assertEqual(sol_gen, target_known_gen) + self.assertEqual(sol_trans_id, target_known_trans_id) + + # sync first time and assert success + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 0) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 0) + + # create more docs, sync second time and assert success + results = yield _create_docs() + _assert_transaction_and_sync_logs(results, 1) + yield sol.sync() + results = yield sol.get_all_docs() + _assert_sync(results, 1) + + # + # Concurrency tests + # + + class _WorkerThread(threading.Thread): + + def __init__(self, params, run_method): + threading.Thread.__init__(self) + self._params = params + self._run_method = run_method + + def run(self): + self._run_method(self) + + def test_correct_transaction_log_after_concurrent_puts(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts. + """ + pool = threading.BoundedSemaphore(value=1) + threads = [] + docs = [] + + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + self._params['docs'].append(doc.doc_id) + pool.release() + + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread( + {'docs': docs, 'db': self.db}, + _run_method) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + # assert length of transaction_log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + REPEAT_TIMES, len(transaction_log)) + + # assert all documents are in the log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_transaction_log_after_concurrent_deletes(self): + """ + Assert that the transaction_log increases accordingly with concurrent + puts and deletes. + """ + threads = [] + docs = [] + pool = threading.BoundedSemaphore(value=1) + + # create/delete method that will be run concurrently + def _run_method(self): + doc = self._params['db'].create_doc({}) + pool.acquire() + docs.append(doc.doc_id) + pool.release() + self._params['db'].delete_doc(doc) + + # launch concurrent threads + for i in range(0, REPEAT_TIMES): + thread = self._WorkerThread({'db': self.db}, _run_method) + thread.start() + threads.append(thread) + + # wait for threads to finish + for thread in threads: + thread.join() + + # assert transaction log + transaction_log = self.db._get_transaction_log() + self.assertEqual( + 2 * REPEAT_TIMES, len(transaction_log)) + # assert that each doc appears twice in the transaction_log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 2, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + def test_correct_sync_log_after_concurrent_puts_and_sync(self): + """ + Assert that the sync_log is correct after concurrent syncs. + """ + docs = [] + + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + def _save_doc_ids(results): + for doc in results: + docs.append(doc.doc_id) + + # create documents in parallel + deferreds = [] + for i in range(0, REPEAT_TIMES): + d = sol.create_doc({}) + deferreds.append(d) + + # wait for documents creation and sync + d = defer.gatherResults(deferreds) + d.addCallback(_save_doc_ids) + d.addCallback(lambda _: sol.sync()) + + def _assert_logs(results): + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) + + d.addCallback(_assert_logs) + d.addCallback(lambda _: sol.close()) + + return d + + @defer.inlineCallbacks + def test_concurrent_syncs_do_not_fail(self): + """ + Assert that concurrent attempts to sync end up being executed + sequentially and do not fail. + """ + docs = [] + + sol = self._soledad_instance( + auth_token='auth-token', + server_url=self.getURL()) + + deferreds = [] + for i in xrange(0, REPEAT_TIMES): + d = sol.create_doc({}) + d.addCallback(lambda doc: docs.append(doc.doc_id)) + d.addCallback(lambda _: sol.sync()) + deferreds.append(d) + yield defer.gatherResults(deferreds, consumeErrors=True) + + transaction_log = self.db._get_transaction_log() + self.assertEqual(REPEAT_TIMES, len(transaction_log)) + # assert all documents are in the remote log + self.assertEqual(REPEAT_TIMES, len(docs)) + for doc_id in docs: + self.assertEqual( + 1, + len(filter(lambda t: t[0] == doc_id, transaction_log))) |