summaryrefslogtreecommitdiff
path: root/testing/tests/sync
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-07-07 11:44:01 +0200
committerKali Kaneko <kali@leap.se>2016-07-12 03:09:27 +0200
commit26f87181f8a8fc7fef58ddd1e52cb5f0edd641bb (patch)
tree899c0a7ec979f60073f87af3732edc2eac811044 /testing/tests/sync
parentb3fb215860a8e50e4a6c551fef78628acdbf25c7 (diff)
[test] toxify tests
- move tests to root directory - split tests in different subdirectories - setup a small package with common test dependencies in /testing/test_soledad - add tox.ini that will: - install the test_soledad package and other test dependencies - install soledad common, client, server from the repository - run tests contianed in /testing/tests directory using pytest This commit also removes all oauth code from tests, as we have removed the u1db dependency (by importing it into the repo and naming it l2db) and don't neet oauth at all right now.
Diffstat (limited to 'testing/tests/sync')
-rw-r--r--testing/tests/sync/__init__.py0
-rw-r--r--testing/tests/sync/test_encdecpool.py315
-rw-r--r--testing/tests/sync/test_sync.py216
-rw-r--r--testing/tests/sync/test_sync_deferred.py196
-rw-r--r--testing/tests/sync/test_sync_mutex.py135
-rw-r--r--testing/tests/sync/test_sync_target.py968
6 files changed, 1830 insertions, 0 deletions
diff --git a/testing/tests/sync/__init__.py b/testing/tests/sync/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/testing/tests/sync/__init__.py
diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py
new file mode 100644
index 00000000..82e99a47
--- /dev/null
+++ b/testing/tests/sync/test_encdecpool.py
@@ -0,0 +1,315 @@
+# -*- coding: utf-8 -*-
+# test_encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for encryption and decryption pool.
+"""
+import json
+from random import shuffle
+
+from mock import MagicMock
+from twisted.internet.defer import inlineCallbacks
+
+from leap.soledad.client.encdecpool import SyncEncrypterPool
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+
+from leap.soledad.common.document import SoledadDocument
+from test_soledad.util import BaseSoledadTest
+from twisted.internet import defer
+from twisted.test.proto_helpers import MemoryReactorClock
+
+DOC_ID = "mydoc"
+DOC_REV = "rev"
+DOC_CONTENT = {'simple': 'document'}
+
+
+class TestSyncEncrypterPool(BaseSoledadTest):
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ crypto = self._soledad._crypto
+ sync_db = self._soledad._sync_db
+ self._pool = SyncEncrypterPool(crypto, sync_db)
+ self._pool.start()
+
+ def tearDown(self):
+ self._pool.stop()
+ BaseSoledadTest.tearDown(self)
+
+ @inlineCallbacks
+ def test_get_encrypted_doc_returns_none(self):
+ """
+ Test that trying to get an encrypted doc from the pool returns None if
+ the document was never added for encryption.
+ """
+ doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
+ self.assertIsNone(doc)
+
+ @inlineCallbacks
+ def test_encrypt_doc_and_get_it_back(self):
+ """
+ Test that the pool actually encrypts a document added to the queue.
+ """
+ doc = SoledadDocument(
+ doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
+ self._pool.encrypt_doc(doc)
+
+ # exhaustivelly attempt to get the encrypted document
+ encrypted = None
+ attempts = 0
+ while encrypted is None and attempts < 10:
+ encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
+ attempts += 1
+
+ self.assertIsNotNone(encrypted)
+ self.assertTrue(attempts < 10)
+
+
+class TestSyncDecrypterPool(BaseSoledadTest):
+
+ def _insert_doc_cb(self, doc, gen, trans_id):
+ """
+ Method used to mock the sync's return_doc_cb callback.
+ """
+ self._inserted_docs.append((doc, gen, trans_id))
+
+ def _setup_pool(self, sync_db=None):
+ sync_db = sync_db or self._soledad._sync_db
+ return SyncDecrypterPool(
+ self._soledad._crypto,
+ sync_db,
+ source_replica_uid=self._soledad._dbpool.replica_uid,
+ insert_doc_cb=self._insert_doc_cb)
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ # setup the pool
+ self._pool = self._setup_pool()
+ # reset the inserted docs mock
+ self._inserted_docs = []
+
+ def tearDown(self):
+ if self._pool.running:
+ self._pool.stop()
+ BaseSoledadTest.tearDown(self)
+
+ def test_insert_received_doc(self):
+ """
+ Test that one document added to the pool is inserted using the
+ callback.
+ """
+ self._pool.start(1)
+ self._pool.insert_received_doc(
+ DOC_ID, DOC_REV, "{}", 1, "trans_id", 1)
+
+ def _assert_doc_was_inserted(_):
+ self.assertEqual(
+ self._inserted_docs,
+ [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")])
+
+ self._pool.deferred.addCallback(_assert_doc_was_inserted)
+ return self._pool.deferred
+
+ def test_looping_control(self):
+ """
+ Start and stop cleanly.
+ """
+ self._pool.start(10)
+ self.assertTrue(self._pool.running)
+ self._pool.stop()
+ self.assertFalse(self._pool.running)
+ self.assertTrue(self._pool.deferred.called)
+
+ def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self):
+ """
+ Test that docs_received table is migrated, and has the sync_id column
+ """
+ mock_run_query = MagicMock(return_value=defer.succeed(None))
+ mock_sync_db = MagicMock()
+ mock_sync_db.runQuery = mock_run_query
+ pool = self._setup_pool(mock_sync_db)
+ d = pool.start(10)
+ pool.stop()
+
+ def assert_trial_to_create_sync_id_column(_):
+ mock_run_query.assert_called_once_with(
+ "ALTER TABLE docs_received ADD COLUMN sync_id")
+
+ d.addCallback(assert_trial_to_create_sync_id_column)
+ return d
+
+ def test_insert_received_doc_many(self):
+ """
+ Test that many documents added to the pool are inserted using the
+ callback.
+ """
+ many = 100
+ self._pool.start(many)
+
+ # insert many docs in the pool
+ for i in xrange(many):
+ gen = idx = i + 1
+ doc_id = "doc_id: %d" % idx
+ rev = "rev: %d" % idx
+ content = {'idx': idx}
+ trans_id = "trans_id: %d" % idx
+ self._pool.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _assert_doc_was_inserted(_):
+ self.assertEqual(many, len(self._inserted_docs))
+ idx = 1
+ for doc, gen, trans_id in self._inserted_docs:
+ expected_gen = idx
+ expected_doc_id = "doc_id: %d" % idx
+ expected_rev = "rev: %d" % idx
+ expected_content = json.dumps({'idx': idx})
+ expected_trans_id = "trans_id: %d" % idx
+
+ self.assertEqual(expected_doc_id, doc.doc_id)
+ self.assertEqual(expected_rev, doc.rev)
+ self.assertEqual(expected_content, json.dumps(doc.content))
+ self.assertEqual(expected_gen, gen)
+ self.assertEqual(expected_trans_id, trans_id)
+
+ idx += 1
+
+ self._pool.deferred.addCallback(_assert_doc_was_inserted)
+ return self._pool.deferred
+
+ def test_insert_encrypted_received_doc(self):
+ """
+ Test that one encrypted document added to the pool is decrypted and
+ inserted using the callback.
+ """
+ crypto = self._soledad._crypto
+ doc = SoledadDocument(
+ doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+
+ # insert the encrypted document in the pool
+ self._pool.start(1)
+ self._pool.insert_encrypted_received_doc(
+ DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
+
+ def _assert_doc_was_decrypted_and_inserted(_):
+ self.assertEqual(1, len(self._inserted_docs))
+ self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
+
+ self._pool.deferred.addCallback(
+ _assert_doc_was_decrypted_and_inserted)
+ return self._pool.deferred
+
+ @inlineCallbacks
+ def test_processing_order(self):
+ """
+ This test ensures that processing of documents only occur if there is
+ a sequence in place.
+ """
+ reactor_clock = MemoryReactorClock()
+ self._pool._loop.clock = reactor_clock
+
+ crypto = self._soledad._crypto
+
+ docs = []
+ for i in xrange(1, 10):
+ i = str(i)
+ doc = SoledadDocument(
+ doc_id=DOC_ID + i, rev=DOC_REV + i,
+ json=json.dumps(DOC_CONTENT))
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc, encrypted_content))
+
+ # insert the encrypted document in the pool
+ self._pool.start(10) # pool is expecting to process 10 docs
+ # first three arrives, forming a sequence
+ for i, (doc, encrypted_content) in enumerate(docs[:3]):
+ gen = idx = i + 1
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx)
+ # last one arrives alone, so it can't be processed
+ doc, encrypted_content = docs[-1]
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10)
+
+ reactor_clock.advance(self._pool.DECRYPT_LOOP_PERIOD)
+ yield self._pool._decrypt_and_recurse()
+
+ self.assertEqual(3, self._pool._processed_docs)
+
+ def test_insert_encrypted_received_doc_many(self, many=100):
+ """
+ Test that many encrypted documents added to the pool are decrypted and
+ inserted using the callback.
+ """
+ crypto = self._soledad._crypto
+ self._pool.start(many)
+ docs = []
+
+ # insert many encrypted docs in the pool
+ for i in xrange(many):
+ gen = idx = i + 1
+ doc_id = "doc_id: %d" % idx
+ rev = "rev: %d" % idx
+ content = {'idx': idx}
+ trans_id = "trans_id: %d" % idx
+
+ doc = SoledadDocument(
+ doc_id=doc_id, rev=rev, json=json.dumps(content))
+
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc_id, rev, encrypted_content, gen,
+ trans_id, idx))
+ shuffle(docs)
+
+ for doc in docs:
+ self._pool.insert_encrypted_received_doc(*doc)
+
+ def _assert_docs_were_decrypted_and_inserted(_):
+ self.assertEqual(many, len(self._inserted_docs))
+ idx = 1
+ for doc, gen, trans_id in self._inserted_docs:
+ expected_gen = idx
+ expected_doc_id = "doc_id: %d" % idx
+ expected_rev = "rev: %d" % idx
+ expected_content = json.dumps({'idx': idx})
+ expected_trans_id = "trans_id: %d" % idx
+
+ self.assertEqual(expected_doc_id, doc.doc_id)
+ self.assertEqual(expected_rev, doc.rev)
+ self.assertEqual(expected_content, json.dumps(doc.content))
+ self.assertEqual(expected_gen, gen)
+ self.assertEqual(expected_trans_id, trans_id)
+
+ idx += 1
+
+ self._pool.deferred.addCallback(
+ _assert_docs_were_decrypted_and_inserted)
+ return self._pool.deferred
+
+ @inlineCallbacks
+ def test_pool_reuse(self):
+ """
+ The pool is reused between syncs, this test verifies that
+ reusing is fine.
+ """
+ for i in xrange(3):
+ yield self.test_insert_encrypted_received_doc_many(5)
+ self._inserted_docs = []
+ decrypted_docs = yield self._pool._get_docs(encrypted=False)
+ # check that decrypted docs staging is clean
+ self.assertEquals([], decrypted_docs)
+ self._pool.stop()
diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py
new file mode 100644
index 00000000..095884ce
--- /dev/null
+++ b/testing/tests/sync/test_sync.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# test_sync.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import tempfile
+import threading
+import time
+
+from urlparse import urljoin
+from twisted.internet import defer
+
+from testscenarios import TestWithScenarios
+
+from leap.soledad.common import couch
+from leap.soledad.client import sync
+
+from test_soledad import u1db_tests as tests
+from test_soledad.u1db_tests import TestCaseWithServer
+from test_soledad.u1db_tests import simple_doc
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import CouchDBTestCase
+
+
+class InterruptableSyncTestCase(
+ BaseSoledadTest, CouchDBTestCase, TestCaseWithServer):
+
+ """
+ Tests for encrypted sync using Soledad server backed by a couch database.
+ """
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def make_app(self):
+ self.request_state = couch.CouchServerState(self.couch_url)
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ def test_interruptable_sync(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+
+ self.skipTest("Sync is currently not interruptable.")
+
+ class _SyncInterruptor(threading.Thread):
+
+ """
+ A thread meant to interrupt the sync process.
+ """
+
+ def __init__(self, soledad, couchdb):
+ self._soledad = soledad
+ self._couchdb = couchdb
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while db._get_generation() < 2:
+ # print "WAITING %d" % db._get_generation()
+ time.sleep(0.1)
+ self._soledad.stop_sync()
+ time.sleep(1)
+
+ number_of_docs = 10
+ self.startServer()
+
+ # instantiate soledad and create a document
+ sol = self._soledad_instance(
+ user='user-uuid', server_url=self.getURL())
+
+ # ensure remote db exists before syncing
+ db = couch.CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-user-uuid'),
+ create=True,
+ ensure_ddocs=True)
+
+ # create interruptor thread
+ t = _SyncInterruptor(sol, db)
+ t.start()
+
+ d = sol.get_all_docs()
+ d.addCallback(lambda results: self.assertEqual([], results[1]))
+
+ def _create_docs(results):
+ # create many small files
+ deferreds = []
+ for i in range(0, number_of_docs):
+ deferreds.append(sol.create_doc(json.loads(simple_doc)))
+ return defer.DeferredList(deferreds)
+
+ # sync with server
+ d.addCallback(_create_docs)
+ d.addCallback(lambda _: sol.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertEqual(number_of_docs, len(results[1])))
+ d.addCallback(lambda _: sol.sync())
+ d.addCallback(lambda _: t.join())
+ d.addCallback(lambda _: db.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertNotEqual(
+ number_of_docs, len(results[1])))
+ d.addCallback(lambda _: sol.sync())
+ d.addCallback(lambda _: db.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertEqual(number_of_docs, len(results[1])))
+
+ def _tear_down(results):
+ db.delete_database()
+ db.close()
+ sol.close()
+
+ d.addCallback(_tear_down)
+ return d
+
+
+class TestSoledadDbSync(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ """
+ Test db.sync remote sync shortcut
+ """
+
+ scenarios = [
+ ('py-token-http', {
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ self.db = self.make_database_for_test(self, 'test1')
+ self.db2 = self.request_state._create_database(replica_uid='test')
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ SoledadWithCouchServerMixin.tearDown(self)
+ # tests.TestCaseWithServer.tearDown(self)
+
+ def do_sync(self):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ target = soledad_sync_target(
+ self, self.db2._dbname,
+ source_replica_uid=self._soledad._dbpool.replica_uid)
+ self.addCleanup(target.close)
+ return sync.SoledadSynchronizer(
+ self.db,
+ target).sync(defer_decryption=False)
+
+ @defer.inlineCallbacks
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+
+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+
+ local_gen_before_sync = yield self.do_sync()
+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ # TODO: add u1db.tests.test_sync.TestRemoteSyncIntegration
diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py
new file mode 100644
index 00000000..4948aaf8
--- /dev/null
+++ b/testing/tests/sync/test_sync_deferred.py
@@ -0,0 +1,196 @@
+# test_sync_deferred.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync with deferred encryption/decryption.
+"""
+import time
+import os
+import random
+import string
+import shutil
+
+from urlparse import urljoin
+
+from twisted.internet import defer
+
+from leap.soledad.common import couch
+
+from leap.soledad.client import sync
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import SQLCipherDatabase
+
+from testscenarios import TestWithScenarios
+
+from test_soledad import u1db_tests as tests
+from test_soledad.util import ADDRESS
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import make_soledad_app
+from test_soledad.util import soledad_sync_target
+
+
+# Just to make clear how this test is different... :)
+DEFER_DECRYPTION = True
+
+WAIT_STEP = 1
+MAX_WAIT = 10
+DBPASS = "pass"
+
+
+class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
+
+ """
+ Another base class for testing the deferred encryption/decryption during
+ the syncs, using the intermediate database.
+ """
+ defer_sync_encryption = True
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ # config info
+ self.db1_file = os.path.join(self.tempdir, "db1.u1db")
+ os.unlink(self.db1_file)
+ self.db_pass = DBPASS
+ self.email = ADDRESS
+
+ # get a random prefix for each test, so we do not mess with
+ # concurrency during initialization and shutting down of
+ # each local db.
+ self.rand_prefix = ''.join(
+ map(lambda x: random.choice(string.ascii_letters), range(6)))
+
+ # open test dbs: db1 will be the local sqlcipher db (which
+ # instantiates a syncdb). We use the self._soledad instance that was
+ # already created on some setUp method.
+ import binascii
+ tohex = binascii.b2a_hex
+ key = tohex(self._soledad.secrets.get_local_storage_key())
+ sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
+ dbpath = self._soledad._local_db_path
+
+ self.opts = SQLCipherOptions(
+ dbpath, key, is_raw_key=True, create=False,
+ defer_encryption=True, sync_db_key=sync_db_key)
+ self.db1 = SQLCipherDatabase(self.opts)
+
+ self.db2 = self.request_state._create_database('test')
+
+ def tearDown(self):
+ # XXX should not access "private" attrs
+ shutil.rmtree(os.path.dirname(self._soledad._local_db_path))
+ SoledadWithCouchServerMixin.tearDown(self)
+
+
+class SyncTimeoutError(Exception):
+
+ """
+ Dummy exception to notify timeout during sync.
+ """
+ pass
+
+
+class TestSoledadDbSyncDeferredEncDecr(
+ TestWithScenarios,
+ BaseSoledadDeferredEncTest,
+ tests.TestCaseWithServer):
+
+ """
+ Test db.sync remote sync shortcut.
+ Case with deferred encryption and decryption: using the intermediate
+ syncdb.
+ """
+
+ scenarios = [
+ ('http', {
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ }),
+ ]
+
+ oauth = False
+ token = True
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ BaseSoledadDeferredEncTest.setUp(self)
+ self.server = self.server_thread = None
+ self.syncer = None
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ dbsyncer = getattr(self, 'dbsyncer', None)
+ if dbsyncer:
+ dbsyncer.close()
+ BaseSoledadDeferredEncTest.tearDown(self)
+
+ def do_sync(self):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ replica_uid = self._soledad._dbpool.replica_uid
+ sync_db = self._soledad._sync_db
+ sync_enc_pool = self._soledad._sync_enc_pool
+ dbsyncer = self._soledad._dbsyncer # Soledad.sync uses the dbsyncer
+
+ target = soledad_sync_target(
+ self, self.db2._dbname,
+ source_replica_uid=replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ self.addCleanup(target.close)
+ return sync.SoledadSynchronizer(
+ dbsyncer,
+ target).sync(defer_decryption=True)
+
+ def wait_for_sync(self):
+ """
+ Wait for sync to finish.
+ """
+ wait = 0
+ syncer = self.syncer
+ if syncer is not None:
+ while syncer.syncing:
+ time.sleep(WAIT_STEP)
+ wait += WAIT_STEP
+ if wait >= MAX_WAIT:
+ raise SyncTimeoutError
+
+ @defer.inlineCallbacks
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ local_gen_before_sync = yield self.do_sync()
+
+ gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py
new file mode 100644
index 00000000..787cfee8
--- /dev/null
+++ b/testing/tests/sync/test_sync_mutex.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+# test_sync_mutex.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Test that synchronization is a critical section and, as such, there might not
+be two concurrent synchronization processes at the same time.
+"""
+
+
+import time
+import uuid
+import tempfile
+import shutil
+
+from urlparse import urljoin
+
+from twisted.internet import defer
+
+from leap.soledad.client.sync import SoledadSynchronizer
+
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+from test_soledad.u1db_tests import TestCaseWithServer
+
+from test_soledad.util import CouchDBTestCase
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+
+
+# monkey-patch the soledad synchronizer so it stores start and finish times
+
+_old_sync = SoledadSynchronizer.sync
+
+
+def _timed_sync(self, defer_decryption=True):
+ t = time.time()
+
+ sync_id = uuid.uuid4()
+
+ if not getattr(self.source, 'sync_times', False):
+ self.source.sync_times = {}
+
+ self.source.sync_times[sync_id] = {'start': t}
+
+ def _store_finish_time(passthrough):
+ t = time.time()
+ self.source.sync_times[sync_id]['end'] = t
+ return passthrough
+
+ d = _old_sync(self, defer_decryption=defer_decryption)
+ d.addBoth(_store_finish_time)
+ return d
+
+SoledadSynchronizer.sync = _timed_sync
+
+# -- end of monkey-patching
+
+
+class TestSyncMutex(
+ BaseSoledadTest, CouchDBTestCase, TestCaseWithServer):
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def make_app(self):
+ self.request_state = CouchServerState(self.couch_url)
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self.user = ('user-%s' % uuid.uuid4().hex)
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+ shutil.rmtree(self.tempdir)
+
+ def test_two_concurrent_syncs_do_not_overlap_no_docs(self):
+ self.startServer()
+
+ # ensure remote db exists before syncing
+ db = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + self.user),
+ create=True,
+ ensure_ddocs=True)
+
+ sol = self._soledad_instance(
+ user=self.user, server_url=self.getURL())
+
+ d1 = sol.sync()
+ d2 = sol.sync()
+
+ def _assert_syncs_do_not_overlap(thearg):
+ # recover sync times
+ sync_times = []
+ for key in sol._dbsyncer.sync_times:
+ sync_times.append(sol._dbsyncer.sync_times[key])
+ sync_times.sort(key=lambda s: s['start'])
+
+ self.assertTrue(
+ (sync_times[0]['start'] < sync_times[0]['end'] and
+ sync_times[0]['end'] < sync_times[1]['start'] and
+ sync_times[1]['start'] < sync_times[1]['end']))
+
+ db.delete_database()
+ db.close()
+ sol.close()
+
+ d = defer.gatherResults([d1, d2])
+ d.addBoth(_assert_syncs_do_not_overlap)
+ return d
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
new file mode 100644
index 00000000..964468ce
--- /dev/null
+++ b/testing/tests/sync/test_sync_target.py
@@ -0,0 +1,968 @@
+# -*- coding: utf-8 -*-
+# test_sync_target.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync target
+"""
+import cStringIO
+import os
+import time
+import json
+import random
+import string
+import shutil
+from uuid import uuid4
+
+from testscenarios import TestWithScenarios
+from twisted.internet import defer
+
+from leap.soledad.client import http_target as target
+from leap.soledad.client import crypto
+from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import SQLCipherDatabase
+
+from leap.soledad.common import l2db
+
+from leap.soledad.common.document import SoledadDocument
+from test_soledad import u1db_tests as tests
+from test_soledad.util import make_sqlcipher_database_for_test
+from test_soledad.util import make_soledad_app
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import ADDRESS
+from test_soledad.util import SQLCIPHER_SCENARIOS
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_remote_sync_target`.
+# -----------------------------------------------------------------------------
+
+class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):
+
+ """
+ Some tests had to be copied to this class so we can instantiate our own
+ target.
+ """
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ creds = {'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }}
+ self.target = target.SoledadHTTPSyncTarget(
+ self.couch_url,
+ uuid4().hex,
+ creds,
+ self._soledad._crypto,
+ None)
+
+ def tearDown(self):
+ self.target.close()
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def test_extra_comma(self):
+ """
+ Test adapted to use encrypted content.
+ """
+ doc = SoledadDocument('i', rev='r')
+ doc.content = {}
+ _crypto = self._soledad._crypto
+ key = _crypto.doc_passphrase(doc.doc_id)
+ secret = _crypto.secret
+
+ enc_json = crypto.encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev,
+ key, secret)
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n{},\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ ('[\r\n{},\r\n{"id": "i", "rev": "r", ' +
+ '"content": %s, "gen": 3, "trans_id": "T-sid"}' +
+ ',\r\n]') % json.dumps(enc_json))
+
+ def test_wrong_start(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("{}\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("\r\n{}\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("")
+
+ def test_wrong_end(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n{}")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n")
+
+ def test_missing_comma(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{}\r\n{"id": "i", "rev": "r", '
+ '"content": "c", "gen": 3}\r\n]')
+
+ def test_no_entries(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n]")
+
+ def test_error_in_stream(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{"new_generation": 0},'
+ '\r\n{"error": "unavailable"}\r\n')
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{"error": "unavailable"}\r\n')
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n')
+
+#
+# functions for TestRemoteSyncTargets
+#
+
+
+def make_local_db_and_soledad_target(
+ test, path='test',
+ source_replica_uid=uuid4().hex):
+ test.startTwistedServer()
+ replica_uid = os.path.basename(path)
+ db = test.request_state._create_database(replica_uid)
+ sync_db = test._soledad._sync_db
+ sync_enc_pool = test._soledad._sync_enc_pool
+ st = soledad_sync_target(
+ test, db._dbname,
+ source_replica_uid=source_replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ return db, st
+
+
+def make_local_db_and_token_soledad_target(
+ test,
+ source_replica_uid=uuid4().hex):
+ db, st = make_local_db_and_soledad_target(
+ test, path='test',
+ source_replica_uid=source_replica_uid)
+ st.set_token_credentials('user-uuid', 'auth-token')
+ return db, st
+
+
+class TestSoledadSyncTarget(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ scenarios = [
+ ('token_soledad',
+ {'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_soledad_document_for_test,
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'sync_target': soledad_sync_target}),
+ ]
+
+ def getSyncTarget(self, path=None, source_replica_uid=uuid4().hex):
+ if self.port is None:
+ self.startTwistedServer()
+ sync_db = self._soledad._sync_db
+ sync_enc_pool = self._soledad._sync_enc_pool
+ if path is None:
+ path = self.db2._dbname
+ target = self.sync_target(
+ self, path,
+ source_replica_uid=source_replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ self.addCleanup(target.close)
+ return target
+
+ def setUp(self):
+ TestWithScenarios.setUp(self)
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ self.db1 = make_sqlcipher_database_for_test(self, 'test1')
+ self.db2 = self.request_state._create_database('test')
+
+ def tearDown(self):
+ # db2, _ = self.request_state.ensure_database('test2')
+ self.delete_db(self.db2._dbname)
+ self.db1.close()
+ SoledadWithCouchServerMixin.tearDown(self)
+ TestWithScenarios.tearDown(self)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send(self):
+ """
+ Test for sync exchanging send of document.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ db = self.db2
+ remote_target = self.getSyncTarget()
+ other_docs = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send_failure_and_retry_scenario(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+
+ def blackhole_getstderr(inst):
+ return cStringIO.StringIO()
+
+ db = self.db2
+ _put_doc_if_newer = db._put_doc_if_newer
+ trigger_ids = ['doc-here2']
+
+ def bomb_put_doc_if_newer(self, doc, save_conflict,
+ replica_uid=None, replica_gen=None,
+ replica_trans_id=None, number_of_docs=None,
+ doc_idx=None, sync_id=None):
+ if doc.doc_id in trigger_ids:
+ raise l2db.errors.U1DBError
+ return _put_doc_if_newer(doc, save_conflict=save_conflict,
+ replica_uid=replica_uid,
+ replica_gen=replica_gen,
+ replica_trans_id=replica_trans_id,
+ number_of_docs=number_of_docs,
+ doc_idx=doc_idx, sync_id=sync_id)
+ from leap.soledad.common.backend import SoledadBackend
+ self.patch(
+ SoledadBackend, '_put_doc_if_newer', bomb_put_doc_if_newer)
+ remote_target = self.getSyncTarget(
+ source_replica_uid='replica')
+ other_changes = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ doc2 = self.make_document('doc-here2', 'replica:1',
+ '{"value": "here2"}')
+
+ with self.assertRaises(l2db.errors.U1DBError):
+ yield remote_target.sync_exchange(
+ [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
+ 'replica',
+ last_known_generation=0,
+ last_known_trans_id=None,
+ insert_doc_cb=receive_doc,
+ defer_decryption=False)
+
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}',
+ False)
+ self.assertEqual(
+ (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual([], other_changes)
+ # retry
+ trigger_ids = []
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here2', 'replica:1', '{"value": "here2"}',
+ False)
+ self.assertEqual(
+ (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
+ other_changes[0][:-1])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send_ensure_callback(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ remote_target = self.getSyncTarget()
+ other_docs = []
+ replica_uid_box = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ def ensure_cb(replica_uid):
+ replica_uid_box.append(replica_uid)
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ ensure_callback=ensure_cb, defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ db = self.db2
+ self.assertEqual(1, len(replica_uid_box))
+ self.assertEqual(db._replica_uid, replica_uid_box[0])
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ def test_sync_exchange_in_stream_error(self):
+ self.skipTest("bypass this test because our sync_exchange process "
+ "does not return u1db error 503 \"unavailable\" for "
+ "now")
+
+ @defer.inlineCallbacks
+ def test_get_sync_info(self):
+ db = self.db2
+ db._set_replica_gen_and_trans_id('other-id', 1, 'T-transid')
+ remote_target = self.getSyncTarget(
+ source_replica_uid='other-id')
+ sync_info = yield remote_target.get_sync_info('other-id')
+ self.assertEqual(
+ ('test', 0, '', 1, 'T-transid'),
+ sync_info)
+
+ @defer.inlineCallbacks
+ def test_record_sync_info(self):
+ remote_target = self.getSyncTarget(
+ source_replica_uid='other-id')
+ yield remote_target.record_sync_info('other-id', 2, 'T-transid')
+ self.assertEqual((2, 'T-transid'),
+ self.db2._get_replica_gen_and_trans_id('other-id'))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_receive(self):
+ db = self.db2
+ doc = db.create_doc_from_json('{"value": "there"}')
+ remote_target = self.getSyncTarget()
+ other_changes = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [], 'replica', last_known_generation=0, last_known_trans_id=None,
+ insert_doc_cb=receive_doc)
+ self.assertEqual(1, new_gen)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
+ other_changes[0][:-1])
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_sync`.
+# -----------------------------------------------------------------------------
+
+target_scenarios = [
+ ('mem,token_soledad',
+ {'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ 'copy_database_for_test': tests.copy_memory_database_for_test,
+ 'make_document_for_test': tests.make_document_for_test})
+]
+
+
+class SoledadDatabaseSyncTargetTests(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.DatabaseBaseTests,
+ tests.TestCaseWithServer):
+ """
+ Adaptation of u1db.tests.test_sync.DatabaseSyncTargetTests.
+ """
+
+ # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
+ # skipped tests can be succesfully executed.
+
+ scenarios = target_scenarios
+
+ whitebox = False
+
+ def setUp(self):
+ tests.TestCaseWithServer.setUp(self)
+ self.other_changes = []
+ SoledadWithCouchServerMixin.setUp(self)
+ self.db, self.st = make_local_db_and_soledad_target(self)
+
+ def tearDown(self):
+ self.db.close()
+ self.st.close()
+ tests.TestCaseWithServer.tearDown(self)
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def set_trace_hook(self, callback, shallow=False):
+ setter = (self.st._set_trace_hook if not shallow else
+ self.st._set_trace_hook_shallow)
+ try:
+ setter(callback)
+ except NotImplementedError:
+ self.skipTest("%s does not implement _set_trace_hook"
+ % (self.st.__class__.__name__,))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ docs_by_gen = [
+ (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
+ 'T-sid')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertTransactionLog(['doc-id'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, last_trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(10, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_push_many(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ docs_by_gen = [
+ (self.make_document(
+ 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
+ (self.make_document(
+ 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id2', 'replica:1', tests.nested_doc, False)
+ self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(11, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_many_new_docs(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to avoid JSON serialization comparison as local
+ and remote representations might differ. It looks directly at the
+ doc's contents instead.
+ """
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db.create_doc_from_json(tests.nested_doc)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ [(doc.doc_id, doc.rev, 1),
+ (doc2.doc_id, doc2.rev, 2)],
+ [c[:-3] + c[-2:-1] for c in self.other_changes])
+ self.assertEqual(
+ json.loads(tests.simple_doc),
+ json.loads(self.other_changes[0][2]))
+ self.assertEqual(
+ json.loads(tests.nested_doc),
+ json.loads(self.other_changes[1][2]))
+ if self.whitebox:
+ self.assertEqual(
+ self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs':
+ [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})
+
+ def receive_doc(self, doc, gen, trans_id):
+ self.other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ def test_get_sync_target(self):
+ self.assertIsNot(None, self.st)
+
+ @defer.inlineCallbacks
+ def test_get_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+
+ @defer.inlineCallbacks
+ def test_create_doc_updates_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+ self.db.create_doc_from_json(tests.simple_doc)
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(1, sync_info[1])
+
+ @defer.inlineCallbacks
+ def test_record_sync_info(self):
+ yield self.st.record_sync_info('replica', 10, 'T-transid')
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(
+ ('test', 0, '', 10, 'T-transid'), sync_info)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_deleted(self):
+ doc = self.db.create_doc_from_json('{}')
+ edit_rev = 'replica:1|' + doc.rev
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertGetDocIncludeDeleted(
+ self.db, doc.doc_id, edit_rev, None, False)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(10, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_refuses_conflicts(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1),
+ self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_ignores_convergence(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ gen, txid = self.db._get_generation_info()
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, doc.rev, tests.simple_doc),
+ 10, 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=gen,
+ last_known_trans_id=txid, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(([], 1), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_new_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1),
+ self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_deleted_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.db.delete_doc(doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
+ self.assertEqual(2, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_getting_newer_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates_of_synced_doc(self):
+ expected = []
+
+ def before_whatschanged_cb(state):
+ if state != 'before whats_changed':
+ return
+ cont = '{"key": "cuncurrent"}'
+ conc_rev = self.db.put_doc(
+ self.make_document(doc.doc_id, 'test:1|z:2', cont))
+ expected.append((doc.doc_id, conc_rev, cont, 3))
+
+ self.set_trace_hook(before_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(expected, [c[:-1] for c in self.other_changes])
+ self.assertEqual(3, new_gen)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates(self):
+
+ def after_whatschanged_cb(state):
+ if state != 'after whats_changed':
+ return
+ self.db.create_doc_from_json('{"new": "doc"}')
+
+ self.set_trace_hook(after_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_converged_handling(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ docs_by_gen = [
+ (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
+ (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
+ 'T-bar')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_detect_incomplete_exchange(self):
+ def before_get_docs_explode(state):
+ if state != 'before get_docs':
+ return
+ raise l2db.errors.U1DBError("fail")
+ self.set_trace_hook(before_get_docs_explode)
+ # suppress traceback printing in the wsgiref server
+ # self.patch(simple_server.ServerHandler,
+ # 'log_exception', lambda h, exc_info: None)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertRaises(
+ (l2db.errors.U1DBError, l2db.errors.BrokenSyncStream),
+ self.st.sync_exchange, [], 'other-replica',
+ last_known_generation=0, last_known_trans_id=None,
+ insert_doc_cb=self.receive_doc)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_doc_ids(self):
+ sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None)
+ if sync_exchange_doc_ids is None:
+ self.skipTest("sync_exchange_doc_ids not implemented")
+ db2 = self.create_database('test2')
+ doc = db2.create_doc_from_json(tests.simple_doc)
+ new_gen, trans_id = yield sync_exchange_doc_ids(
+ db2, [(doc.doc_id, 10, 'T-sid')], 0, None,
+ insert_doc_cb=self.receive_doc)
+ self.assertGetDoc(self.db, doc.doc_id, doc.rev,
+ tests.simple_doc, False)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3])
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook(self):
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb)
+ yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ yield self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ],
+ called)
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook_shallow(self):
+ if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or
+ self.st._set_trace_hook_shallow.im_func ==
+ target.SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func):
+ # shallow same as full
+ expected = ['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ]
+ else:
+ expected = ['sync_exchange', 'record_sync_info']
+
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb, shallow=True)
+ yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ yield self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(expected, called)
+
+
+# Just to make clear how this test is different... :)
+DEFER_DECRYPTION = False
+
+WAIT_STEP = 1
+MAX_WAIT = 10
+DBPASS = "pass"
+
+
+class SyncTimeoutError(Exception):
+
+ """
+ Dummy exception to notify timeout during sync.
+ """
+ pass
+
+
+class TestSoledadDbSync(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ """Test db.sync remote sync shortcut"""
+
+ scenarios = [
+ ('py-token-http', {
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ SoledadWithCouchServerMixin.setUp(self)
+ self.server = self.server_thread = None
+ self.startTwistedServer()
+ self.syncer = None
+
+ # config info
+ self.db1_file = os.path.join(self.tempdir, "db1.u1db")
+ os.unlink(self.db1_file)
+ self.db_pass = DBPASS
+ self.email = ADDRESS
+
+ # get a random prefix for each test, so we do not mess with
+ # concurrency during initialization and shutting down of
+ # each local db.
+ self.rand_prefix = ''.join(
+ map(lambda x: random.choice(string.ascii_letters), range(6)))
+
+ # open test dbs: db1 will be the local sqlcipher db (which
+ # instantiates a syncdb). We use the self._soledad instance that was
+ # already created on some setUp method.
+ import binascii
+ tohex = binascii.b2a_hex
+ key = tohex(self._soledad.secrets.get_local_storage_key())
+ sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
+ dbpath = self._soledad._local_db_path
+
+ self.opts = SQLCipherOptions(
+ dbpath, key, is_raw_key=True, create=False,
+ defer_encryption=True, sync_db_key=sync_db_key)
+ self.db1 = SQLCipherDatabase(self.opts)
+
+ self.db2 = self.request_state._create_database(replica_uid='test')
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ dbsyncer = getattr(self, 'dbsyncer', None)
+ if dbsyncer:
+ dbsyncer.close()
+ self.db1.close()
+ self.db2.close()
+ self._soledad.close()
+
+ # XXX should not access "private" attrs
+ shutil.rmtree(os.path.dirname(self._soledad._local_db_path))
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def do_sync(self, target_name):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ if self.token:
+ creds = {'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }}
+ target_url = self.getURL(self.db2._dbname)
+
+ # get a u1db syncer
+ crypto = self._soledad._crypto
+ replica_uid = self.db1._replica_uid
+ dbsyncer = SQLCipherU1DBSync(
+ self.opts,
+ crypto,
+ replica_uid,
+ None,
+ defer_encryption=True)
+ self.dbsyncer = dbsyncer
+ return dbsyncer.sync(target_url,
+ creds=creds,
+ defer_decryption=DEFER_DECRYPTION)
+ else:
+ return self._do_sync(self, target_name)
+
+ def _do_sync(self, target_name):
+ if self.oauth:
+ path = '~/' + target_name
+ extra = dict(creds={'oauth': {
+ 'consumer_key': tests.consumer1.key,
+ 'consumer_secret': tests.consumer1.secret,
+ 'token_key': tests.token1.key,
+ 'token_secret': tests.token1.secret,
+ }})
+ else:
+ path = target_name
+ extra = {}
+ target_url = self.getURL(path)
+ return self.db.sync(target_url, **extra)
+
+ def wait_for_sync(self):
+ """
+ Wait for sync to finish.
+ """
+ wait = 0
+ syncer = self.syncer
+ if syncer is not None:
+ while syncer.syncing:
+ time.sleep(WAIT_STEP)
+ wait += WAIT_STEP
+ if wait >= MAX_WAIT:
+ raise SyncTimeoutError
+
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ d = self.do_sync('test')
+
+ def _assert_successful_sync(results):
+ import time
+ # need to give time to the encryption to proceed
+ # TODO should implement a defer list to subscribe to the
+ # all-decrypted event
+ time.sleep(2)
+ local_gen_before_sync = results
+ self.wait_for_sync()
+
+ gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ d.addCallback(_assert_successful_sync)
+ return d
+
+
+class SQLCipherSyncTargetTests(SoledadDatabaseSyncTargetTests):
+
+ # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
+ # skipped tests can be succesfully executed.
+
+ scenarios = (tests.multiply_scenarios(SQLCIPHER_SCENARIOS,
+ target_scenarios))
+
+ whitebox = False