summaryrefslogtreecommitdiff
path: root/testing/tests/sync
diff options
context:
space:
mode:
Diffstat (limited to 'testing/tests/sync')
-rw-r--r--testing/tests/sync/__init__.py0
-rw-r--r--testing/tests/sync/test_encdecpool.py315
-rw-r--r--testing/tests/sync/test_sync.py216
-rw-r--r--testing/tests/sync/test_sync_deferred.py196
-rw-r--r--testing/tests/sync/test_sync_mutex.py135
-rw-r--r--testing/tests/sync/test_sync_target.py968
6 files changed, 1830 insertions, 0 deletions
diff --git a/testing/tests/sync/__init__.py b/testing/tests/sync/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/testing/tests/sync/__init__.py
diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py
new file mode 100644
index 00000000..82e99a47
--- /dev/null
+++ b/testing/tests/sync/test_encdecpool.py
@@ -0,0 +1,315 @@
+# -*- coding: utf-8 -*-
+# test_encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for encryption and decryption pool.
+"""
+import json
+from random import shuffle
+
+from mock import MagicMock
+from twisted.internet.defer import inlineCallbacks
+
+from leap.soledad.client.encdecpool import SyncEncrypterPool
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+
+from leap.soledad.common.document import SoledadDocument
+from test_soledad.util import BaseSoledadTest
+from twisted.internet import defer
+from twisted.test.proto_helpers import MemoryReactorClock
+
+DOC_ID = "mydoc"
+DOC_REV = "rev"
+DOC_CONTENT = {'simple': 'document'}
+
+
+class TestSyncEncrypterPool(BaseSoledadTest):
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ crypto = self._soledad._crypto
+ sync_db = self._soledad._sync_db
+ self._pool = SyncEncrypterPool(crypto, sync_db)
+ self._pool.start()
+
+ def tearDown(self):
+ self._pool.stop()
+ BaseSoledadTest.tearDown(self)
+
+ @inlineCallbacks
+ def test_get_encrypted_doc_returns_none(self):
+ """
+ Test that trying to get an encrypted doc from the pool returns None if
+ the document was never added for encryption.
+ """
+ doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
+ self.assertIsNone(doc)
+
+ @inlineCallbacks
+ def test_encrypt_doc_and_get_it_back(self):
+ """
+ Test that the pool actually encrypts a document added to the queue.
+ """
+ doc = SoledadDocument(
+ doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
+ self._pool.encrypt_doc(doc)
+
+ # exhaustivelly attempt to get the encrypted document
+ encrypted = None
+ attempts = 0
+ while encrypted is None and attempts < 10:
+ encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
+ attempts += 1
+
+ self.assertIsNotNone(encrypted)
+ self.assertTrue(attempts < 10)
+
+
+class TestSyncDecrypterPool(BaseSoledadTest):
+
+ def _insert_doc_cb(self, doc, gen, trans_id):
+ """
+ Method used to mock the sync's return_doc_cb callback.
+ """
+ self._inserted_docs.append((doc, gen, trans_id))
+
+ def _setup_pool(self, sync_db=None):
+ sync_db = sync_db or self._soledad._sync_db
+ return SyncDecrypterPool(
+ self._soledad._crypto,
+ sync_db,
+ source_replica_uid=self._soledad._dbpool.replica_uid,
+ insert_doc_cb=self._insert_doc_cb)
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ # setup the pool
+ self._pool = self._setup_pool()
+ # reset the inserted docs mock
+ self._inserted_docs = []
+
+ def tearDown(self):
+ if self._pool.running:
+ self._pool.stop()
+ BaseSoledadTest.tearDown(self)
+
+ def test_insert_received_doc(self):
+ """
+ Test that one document added to the pool is inserted using the
+ callback.
+ """
+ self._pool.start(1)
+ self._pool.insert_received_doc(
+ DOC_ID, DOC_REV, "{}", 1, "trans_id", 1)
+
+ def _assert_doc_was_inserted(_):
+ self.assertEqual(
+ self._inserted_docs,
+ [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")])
+
+ self._pool.deferred.addCallback(_assert_doc_was_inserted)
+ return self._pool.deferred
+
+ def test_looping_control(self):
+ """
+ Start and stop cleanly.
+ """
+ self._pool.start(10)
+ self.assertTrue(self._pool.running)
+ self._pool.stop()
+ self.assertFalse(self._pool.running)
+ self.assertTrue(self._pool.deferred.called)
+
+ def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self):
+ """
+ Test that docs_received table is migrated, and has the sync_id column
+ """
+ mock_run_query = MagicMock(return_value=defer.succeed(None))
+ mock_sync_db = MagicMock()
+ mock_sync_db.runQuery = mock_run_query
+ pool = self._setup_pool(mock_sync_db)
+ d = pool.start(10)
+ pool.stop()
+
+ def assert_trial_to_create_sync_id_column(_):
+ mock_run_query.assert_called_once_with(
+ "ALTER TABLE docs_received ADD COLUMN sync_id")
+
+ d.addCallback(assert_trial_to_create_sync_id_column)
+ return d
+
+ def test_insert_received_doc_many(self):
+ """
+ Test that many documents added to the pool are inserted using the
+ callback.
+ """
+ many = 100
+ self._pool.start(many)
+
+ # insert many docs in the pool
+ for i in xrange(many):
+ gen = idx = i + 1
+ doc_id = "doc_id: %d" % idx
+ rev = "rev: %d" % idx
+ content = {'idx': idx}
+ trans_id = "trans_id: %d" % idx
+ self._pool.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _assert_doc_was_inserted(_):
+ self.assertEqual(many, len(self._inserted_docs))
+ idx = 1
+ for doc, gen, trans_id in self._inserted_docs:
+ expected_gen = idx
+ expected_doc_id = "doc_id: %d" % idx
+ expected_rev = "rev: %d" % idx
+ expected_content = json.dumps({'idx': idx})
+ expected_trans_id = "trans_id: %d" % idx
+
+ self.assertEqual(expected_doc_id, doc.doc_id)
+ self.assertEqual(expected_rev, doc.rev)
+ self.assertEqual(expected_content, json.dumps(doc.content))
+ self.assertEqual(expected_gen, gen)
+ self.assertEqual(expected_trans_id, trans_id)
+
+ idx += 1
+
+ self._pool.deferred.addCallback(_assert_doc_was_inserted)
+ return self._pool.deferred
+
+ def test_insert_encrypted_received_doc(self):
+ """
+ Test that one encrypted document added to the pool is decrypted and
+ inserted using the callback.
+ """
+ crypto = self._soledad._crypto
+ doc = SoledadDocument(
+ doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+
+ # insert the encrypted document in the pool
+ self._pool.start(1)
+ self._pool.insert_encrypted_received_doc(
+ DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
+
+ def _assert_doc_was_decrypted_and_inserted(_):
+ self.assertEqual(1, len(self._inserted_docs))
+ self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
+
+ self._pool.deferred.addCallback(
+ _assert_doc_was_decrypted_and_inserted)
+ return self._pool.deferred
+
+ @inlineCallbacks
+ def test_processing_order(self):
+ """
+ This test ensures that processing of documents only occur if there is
+ a sequence in place.
+ """
+ reactor_clock = MemoryReactorClock()
+ self._pool._loop.clock = reactor_clock
+
+ crypto = self._soledad._crypto
+
+ docs = []
+ for i in xrange(1, 10):
+ i = str(i)
+ doc = SoledadDocument(
+ doc_id=DOC_ID + i, rev=DOC_REV + i,
+ json=json.dumps(DOC_CONTENT))
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc, encrypted_content))
+
+ # insert the encrypted document in the pool
+ self._pool.start(10) # pool is expecting to process 10 docs
+ # first three arrives, forming a sequence
+ for i, (doc, encrypted_content) in enumerate(docs[:3]):
+ gen = idx = i + 1
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx)
+ # last one arrives alone, so it can't be processed
+ doc, encrypted_content = docs[-1]
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10)
+
+ reactor_clock.advance(self._pool.DECRYPT_LOOP_PERIOD)
+ yield self._pool._decrypt_and_recurse()
+
+ self.assertEqual(3, self._pool._processed_docs)
+
+ def test_insert_encrypted_received_doc_many(self, many=100):
+ """
+ Test that many encrypted documents added to the pool are decrypted and
+ inserted using the callback.
+ """
+ crypto = self._soledad._crypto
+ self._pool.start(many)
+ docs = []
+
+ # insert many encrypted docs in the pool
+ for i in xrange(many):
+ gen = idx = i + 1
+ doc_id = "doc_id: %d" % idx
+ rev = "rev: %d" % idx
+ content = {'idx': idx}
+ trans_id = "trans_id: %d" % idx
+
+ doc = SoledadDocument(
+ doc_id=doc_id, rev=rev, json=json.dumps(content))
+
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc_id, rev, encrypted_content, gen,
+ trans_id, idx))
+ shuffle(docs)
+
+ for doc in docs:
+ self._pool.insert_encrypted_received_doc(*doc)
+
+ def _assert_docs_were_decrypted_and_inserted(_):
+ self.assertEqual(many, len(self._inserted_docs))
+ idx = 1
+ for doc, gen, trans_id in self._inserted_docs:
+ expected_gen = idx
+ expected_doc_id = "doc_id: %d" % idx
+ expected_rev = "rev: %d" % idx
+ expected_content = json.dumps({'idx': idx})
+ expected_trans_id = "trans_id: %d" % idx
+
+ self.assertEqual(expected_doc_id, doc.doc_id)
+ self.assertEqual(expected_rev, doc.rev)
+ self.assertEqual(expected_content, json.dumps(doc.content))
+ self.assertEqual(expected_gen, gen)
+ self.assertEqual(expected_trans_id, trans_id)
+
+ idx += 1
+
+ self._pool.deferred.addCallback(
+ _assert_docs_were_decrypted_and_inserted)
+ return self._pool.deferred
+
+ @inlineCallbacks
+ def test_pool_reuse(self):
+ """
+ The pool is reused between syncs, this test verifies that
+ reusing is fine.
+ """
+ for i in xrange(3):
+ yield self.test_insert_encrypted_received_doc_many(5)
+ self._inserted_docs = []
+ decrypted_docs = yield self._pool._get_docs(encrypted=False)
+ # check that decrypted docs staging is clean
+ self.assertEquals([], decrypted_docs)
+ self._pool.stop()
diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py
new file mode 100644
index 00000000..095884ce
--- /dev/null
+++ b/testing/tests/sync/test_sync.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# test_sync.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import tempfile
+import threading
+import time
+
+from urlparse import urljoin
+from twisted.internet import defer
+
+from testscenarios import TestWithScenarios
+
+from leap.soledad.common import couch
+from leap.soledad.client import sync
+
+from test_soledad import u1db_tests as tests
+from test_soledad.u1db_tests import TestCaseWithServer
+from test_soledad.u1db_tests import simple_doc
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import CouchDBTestCase
+
+
+class InterruptableSyncTestCase(
+ BaseSoledadTest, CouchDBTestCase, TestCaseWithServer):
+
+ """
+ Tests for encrypted sync using Soledad server backed by a couch database.
+ """
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def make_app(self):
+ self.request_state = couch.CouchServerState(self.couch_url)
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+
+ def test_interruptable_sync(self):
+ """
+ Test if Soledad can sync many smallfiles.
+ """
+
+ self.skipTest("Sync is currently not interruptable.")
+
+ class _SyncInterruptor(threading.Thread):
+
+ """
+ A thread meant to interrupt the sync process.
+ """
+
+ def __init__(self, soledad, couchdb):
+ self._soledad = soledad
+ self._couchdb = couchdb
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while db._get_generation() < 2:
+ # print "WAITING %d" % db._get_generation()
+ time.sleep(0.1)
+ self._soledad.stop_sync()
+ time.sleep(1)
+
+ number_of_docs = 10
+ self.startServer()
+
+ # instantiate soledad and create a document
+ sol = self._soledad_instance(
+ user='user-uuid', server_url=self.getURL())
+
+ # ensure remote db exists before syncing
+ db = couch.CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-user-uuid'),
+ create=True,
+ ensure_ddocs=True)
+
+ # create interruptor thread
+ t = _SyncInterruptor(sol, db)
+ t.start()
+
+ d = sol.get_all_docs()
+ d.addCallback(lambda results: self.assertEqual([], results[1]))
+
+ def _create_docs(results):
+ # create many small files
+ deferreds = []
+ for i in range(0, number_of_docs):
+ deferreds.append(sol.create_doc(json.loads(simple_doc)))
+ return defer.DeferredList(deferreds)
+
+ # sync with server
+ d.addCallback(_create_docs)
+ d.addCallback(lambda _: sol.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertEqual(number_of_docs, len(results[1])))
+ d.addCallback(lambda _: sol.sync())
+ d.addCallback(lambda _: t.join())
+ d.addCallback(lambda _: db.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertNotEqual(
+ number_of_docs, len(results[1])))
+ d.addCallback(lambda _: sol.sync())
+ d.addCallback(lambda _: db.get_all_docs())
+ d.addCallback(
+ lambda results: self.assertEqual(number_of_docs, len(results[1])))
+
+ def _tear_down(results):
+ db.delete_database()
+ db.close()
+ sol.close()
+
+ d.addCallback(_tear_down)
+ return d
+
+
+class TestSoledadDbSync(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ """
+ Test db.sync remote sync shortcut
+ """
+
+ scenarios = [
+ ('py-token-http', {
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ self.db = self.make_database_for_test(self, 'test1')
+ self.db2 = self.request_state._create_database(replica_uid='test')
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ SoledadWithCouchServerMixin.tearDown(self)
+ # tests.TestCaseWithServer.tearDown(self)
+
+ def do_sync(self):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ target = soledad_sync_target(
+ self, self.db2._dbname,
+ source_replica_uid=self._soledad._dbpool.replica_uid)
+ self.addCleanup(target.close)
+ return sync.SoledadSynchronizer(
+ self.db,
+ target).sync(defer_decryption=False)
+
+ @defer.inlineCallbacks
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+
+ doc1 = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+
+ local_gen_before_sync = yield self.do_sync()
+ gen, _, changes = self.db.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ # TODO: add u1db.tests.test_sync.TestRemoteSyncIntegration
diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py
new file mode 100644
index 00000000..4948aaf8
--- /dev/null
+++ b/testing/tests/sync/test_sync_deferred.py
@@ -0,0 +1,196 @@
+# test_sync_deferred.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync with deferred encryption/decryption.
+"""
+import time
+import os
+import random
+import string
+import shutil
+
+from urlparse import urljoin
+
+from twisted.internet import defer
+
+from leap.soledad.common import couch
+
+from leap.soledad.client import sync
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import SQLCipherDatabase
+
+from testscenarios import TestWithScenarios
+
+from test_soledad import u1db_tests as tests
+from test_soledad.util import ADDRESS
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import make_soledad_app
+from test_soledad.util import soledad_sync_target
+
+
+# Just to make clear how this test is different... :)
+DEFER_DECRYPTION = True
+
+WAIT_STEP = 1
+MAX_WAIT = 10
+DBPASS = "pass"
+
+
+class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
+
+ """
+ Another base class for testing the deferred encryption/decryption during
+ the syncs, using the intermediate database.
+ """
+ defer_sync_encryption = True
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ # config info
+ self.db1_file = os.path.join(self.tempdir, "db1.u1db")
+ os.unlink(self.db1_file)
+ self.db_pass = DBPASS
+ self.email = ADDRESS
+
+ # get a random prefix for each test, so we do not mess with
+ # concurrency during initialization and shutting down of
+ # each local db.
+ self.rand_prefix = ''.join(
+ map(lambda x: random.choice(string.ascii_letters), range(6)))
+
+ # open test dbs: db1 will be the local sqlcipher db (which
+ # instantiates a syncdb). We use the self._soledad instance that was
+ # already created on some setUp method.
+ import binascii
+ tohex = binascii.b2a_hex
+ key = tohex(self._soledad.secrets.get_local_storage_key())
+ sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
+ dbpath = self._soledad._local_db_path
+
+ self.opts = SQLCipherOptions(
+ dbpath, key, is_raw_key=True, create=False,
+ defer_encryption=True, sync_db_key=sync_db_key)
+ self.db1 = SQLCipherDatabase(self.opts)
+
+ self.db2 = self.request_state._create_database('test')
+
+ def tearDown(self):
+ # XXX should not access "private" attrs
+ shutil.rmtree(os.path.dirname(self._soledad._local_db_path))
+ SoledadWithCouchServerMixin.tearDown(self)
+
+
+class SyncTimeoutError(Exception):
+
+ """
+ Dummy exception to notify timeout during sync.
+ """
+ pass
+
+
+class TestSoledadDbSyncDeferredEncDecr(
+ TestWithScenarios,
+ BaseSoledadDeferredEncTest,
+ tests.TestCaseWithServer):
+
+ """
+ Test db.sync remote sync shortcut.
+ Case with deferred encryption and decryption: using the intermediate
+ syncdb.
+ """
+
+ scenarios = [
+ ('http', {
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ }),
+ ]
+
+ oauth = False
+ token = True
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ BaseSoledadDeferredEncTest.setUp(self)
+ self.server = self.server_thread = None
+ self.syncer = None
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ dbsyncer = getattr(self, 'dbsyncer', None)
+ if dbsyncer:
+ dbsyncer.close()
+ BaseSoledadDeferredEncTest.tearDown(self)
+
+ def do_sync(self):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ replica_uid = self._soledad._dbpool.replica_uid
+ sync_db = self._soledad._sync_db
+ sync_enc_pool = self._soledad._sync_enc_pool
+ dbsyncer = self._soledad._dbsyncer # Soledad.sync uses the dbsyncer
+
+ target = soledad_sync_target(
+ self, self.db2._dbname,
+ source_replica_uid=replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ self.addCleanup(target.close)
+ return sync.SoledadSynchronizer(
+ dbsyncer,
+ target).sync(defer_decryption=True)
+
+ def wait_for_sync(self):
+ """
+ Wait for sync to finish.
+ """
+ wait = 0
+ syncer = self.syncer
+ if syncer is not None:
+ while syncer.syncing:
+ time.sleep(WAIT_STEP)
+ wait += WAIT_STEP
+ if wait >= MAX_WAIT:
+ raise SyncTimeoutError
+
+ @defer.inlineCallbacks
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ local_gen_before_sync = yield self.do_sync()
+
+ gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py
new file mode 100644
index 00000000..787cfee8
--- /dev/null
+++ b/testing/tests/sync/test_sync_mutex.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+# test_sync_mutex.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Test that synchronization is a critical section and, as such, there might not
+be two concurrent synchronization processes at the same time.
+"""
+
+
+import time
+import uuid
+import tempfile
+import shutil
+
+from urlparse import urljoin
+
+from twisted.internet import defer
+
+from leap.soledad.client.sync import SoledadSynchronizer
+
+from leap.soledad.common.couch.state import CouchServerState
+from leap.soledad.common.couch import CouchDatabase
+from test_soledad.u1db_tests import TestCaseWithServer
+
+from test_soledad.util import CouchDBTestCase
+from test_soledad.util import BaseSoledadTest
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+
+
+# monkey-patch the soledad synchronizer so it stores start and finish times
+
+_old_sync = SoledadSynchronizer.sync
+
+
+def _timed_sync(self, defer_decryption=True):
+ t = time.time()
+
+ sync_id = uuid.uuid4()
+
+ if not getattr(self.source, 'sync_times', False):
+ self.source.sync_times = {}
+
+ self.source.sync_times[sync_id] = {'start': t}
+
+ def _store_finish_time(passthrough):
+ t = time.time()
+ self.source.sync_times[sync_id]['end'] = t
+ return passthrough
+
+ d = _old_sync(self, defer_decryption=defer_decryption)
+ d.addBoth(_store_finish_time)
+ return d
+
+SoledadSynchronizer.sync = _timed_sync
+
+# -- end of monkey-patching
+
+
+class TestSyncMutex(
+ BaseSoledadTest, CouchDBTestCase, TestCaseWithServer):
+
+ @staticmethod
+ def make_app_with_state(state):
+ return make_token_soledad_app(state)
+
+ make_document_for_test = make_soledad_document_for_test
+
+ sync_target = soledad_sync_target
+
+ def make_app(self):
+ self.request_state = CouchServerState(self.couch_url)
+ return self.make_app_with_state(self.request_state)
+
+ def setUp(self):
+ TestCaseWithServer.setUp(self)
+ CouchDBTestCase.setUp(self)
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self.user = ('user-%s' % uuid.uuid4().hex)
+
+ def tearDown(self):
+ CouchDBTestCase.tearDown(self)
+ TestCaseWithServer.tearDown(self)
+ shutil.rmtree(self.tempdir)
+
+ def test_two_concurrent_syncs_do_not_overlap_no_docs(self):
+ self.startServer()
+
+ # ensure remote db exists before syncing
+ db = CouchDatabase.open_database(
+ urljoin(self.couch_url, 'user-' + self.user),
+ create=True,
+ ensure_ddocs=True)
+
+ sol = self._soledad_instance(
+ user=self.user, server_url=self.getURL())
+
+ d1 = sol.sync()
+ d2 = sol.sync()
+
+ def _assert_syncs_do_not_overlap(thearg):
+ # recover sync times
+ sync_times = []
+ for key in sol._dbsyncer.sync_times:
+ sync_times.append(sol._dbsyncer.sync_times[key])
+ sync_times.sort(key=lambda s: s['start'])
+
+ self.assertTrue(
+ (sync_times[0]['start'] < sync_times[0]['end'] and
+ sync_times[0]['end'] < sync_times[1]['start'] and
+ sync_times[1]['start'] < sync_times[1]['end']))
+
+ db.delete_database()
+ db.close()
+ sol.close()
+
+ d = defer.gatherResults([d1, d2])
+ d.addBoth(_assert_syncs_do_not_overlap)
+ return d
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
new file mode 100644
index 00000000..964468ce
--- /dev/null
+++ b/testing/tests/sync/test_sync_target.py
@@ -0,0 +1,968 @@
+# -*- coding: utf-8 -*-
+# test_sync_target.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test Leap backend bits: sync target
+"""
+import cStringIO
+import os
+import time
+import json
+import random
+import string
+import shutil
+from uuid import uuid4
+
+from testscenarios import TestWithScenarios
+from twisted.internet import defer
+
+from leap.soledad.client import http_target as target
+from leap.soledad.client import crypto
+from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import SQLCipherDatabase
+
+from leap.soledad.common import l2db
+
+from leap.soledad.common.document import SoledadDocument
+from test_soledad import u1db_tests as tests
+from test_soledad.util import make_sqlcipher_database_for_test
+from test_soledad.util import make_soledad_app
+from test_soledad.util import make_token_soledad_app
+from test_soledad.util import make_soledad_document_for_test
+from test_soledad.util import soledad_sync_target
+from test_soledad.util import SoledadWithCouchServerMixin
+from test_soledad.util import ADDRESS
+from test_soledad.util import SQLCIPHER_SCENARIOS
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_remote_sync_target`.
+# -----------------------------------------------------------------------------
+
+class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):
+
+ """
+ Some tests had to be copied to this class so we can instantiate our own
+ target.
+ """
+
+ def setUp(self):
+ SoledadWithCouchServerMixin.setUp(self)
+ creds = {'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }}
+ self.target = target.SoledadHTTPSyncTarget(
+ self.couch_url,
+ uuid4().hex,
+ creds,
+ self._soledad._crypto,
+ None)
+
+ def tearDown(self):
+ self.target.close()
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def test_extra_comma(self):
+ """
+ Test adapted to use encrypted content.
+ """
+ doc = SoledadDocument('i', rev='r')
+ doc.content = {}
+ _crypto = self._soledad._crypto
+ key = _crypto.doc_passphrase(doc.doc_id)
+ secret = _crypto.secret
+
+ enc_json = crypto.encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev,
+ key, secret)
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n{},\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ ('[\r\n{},\r\n{"id": "i", "rev": "r", ' +
+ '"content": %s, "gen": 3, "trans_id": "T-sid"}' +
+ ',\r\n]') % json.dumps(enc_json))
+
+ def test_wrong_start(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("{}\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("\r\n{}\r\n]")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("")
+
+ def test_wrong_end(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n{}")
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n")
+
+ def test_missing_comma(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{}\r\n{"id": "i", "rev": "r", '
+ '"content": "c", "gen": 3}\r\n]')
+
+ def test_no_entries(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response("[\r\n]")
+
+ def test_error_in_stream(self):
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{"new_generation": 0},'
+ '\r\n{"error": "unavailable"}\r\n')
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response(
+ '[\r\n{"error": "unavailable"}\r\n')
+
+ with self.assertRaises(l2db.errors.BrokenSyncStream):
+ self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n')
+
+#
+# functions for TestRemoteSyncTargets
+#
+
+
+def make_local_db_and_soledad_target(
+ test, path='test',
+ source_replica_uid=uuid4().hex):
+ test.startTwistedServer()
+ replica_uid = os.path.basename(path)
+ db = test.request_state._create_database(replica_uid)
+ sync_db = test._soledad._sync_db
+ sync_enc_pool = test._soledad._sync_enc_pool
+ st = soledad_sync_target(
+ test, db._dbname,
+ source_replica_uid=source_replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ return db, st
+
+
+def make_local_db_and_token_soledad_target(
+ test,
+ source_replica_uid=uuid4().hex):
+ db, st = make_local_db_and_soledad_target(
+ test, path='test',
+ source_replica_uid=source_replica_uid)
+ st.set_token_credentials('user-uuid', 'auth-token')
+ return db, st
+
+
+class TestSoledadSyncTarget(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ scenarios = [
+ ('token_soledad',
+ {'make_app_with_state': make_token_soledad_app,
+ 'make_document_for_test': make_soledad_document_for_test,
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'sync_target': soledad_sync_target}),
+ ]
+
+ def getSyncTarget(self, path=None, source_replica_uid=uuid4().hex):
+ if self.port is None:
+ self.startTwistedServer()
+ sync_db = self._soledad._sync_db
+ sync_enc_pool = self._soledad._sync_enc_pool
+ if path is None:
+ path = self.db2._dbname
+ target = self.sync_target(
+ self, path,
+ source_replica_uid=source_replica_uid,
+ sync_db=sync_db,
+ sync_enc_pool=sync_enc_pool)
+ self.addCleanup(target.close)
+ return target
+
+ def setUp(self):
+ TestWithScenarios.setUp(self)
+ SoledadWithCouchServerMixin.setUp(self)
+ self.startTwistedServer()
+ self.db1 = make_sqlcipher_database_for_test(self, 'test1')
+ self.db2 = self.request_state._create_database('test')
+
+ def tearDown(self):
+ # db2, _ = self.request_state.ensure_database('test2')
+ self.delete_db(self.db2._dbname)
+ self.db1.close()
+ SoledadWithCouchServerMixin.tearDown(self)
+ TestWithScenarios.tearDown(self)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send(self):
+ """
+ Test for sync exchanging send of document.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ db = self.db2
+ remote_target = self.getSyncTarget()
+ other_docs = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send_failure_and_retry_scenario(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+
+ def blackhole_getstderr(inst):
+ return cStringIO.StringIO()
+
+ db = self.db2
+ _put_doc_if_newer = db._put_doc_if_newer
+ trigger_ids = ['doc-here2']
+
+ def bomb_put_doc_if_newer(self, doc, save_conflict,
+ replica_uid=None, replica_gen=None,
+ replica_trans_id=None, number_of_docs=None,
+ doc_idx=None, sync_id=None):
+ if doc.doc_id in trigger_ids:
+ raise l2db.errors.U1DBError
+ return _put_doc_if_newer(doc, save_conflict=save_conflict,
+ replica_uid=replica_uid,
+ replica_gen=replica_gen,
+ replica_trans_id=replica_trans_id,
+ number_of_docs=number_of_docs,
+ doc_idx=doc_idx, sync_id=sync_id)
+ from leap.soledad.common.backend import SoledadBackend
+ self.patch(
+ SoledadBackend, '_put_doc_if_newer', bomb_put_doc_if_newer)
+ remote_target = self.getSyncTarget(
+ source_replica_uid='replica')
+ other_changes = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ doc2 = self.make_document('doc-here2', 'replica:1',
+ '{"value": "here2"}')
+
+ with self.assertRaises(l2db.errors.U1DBError):
+ yield remote_target.sync_exchange(
+ [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],
+ 'replica',
+ last_known_generation=0,
+ last_known_trans_id=None,
+ insert_doc_cb=receive_doc,
+ defer_decryption=False)
+
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}',
+ False)
+ self.assertEqual(
+ (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual([], other_changes)
+ # retry
+ trigger_ids = []
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ db, 'doc-here2', 'replica:1', '{"value": "here2"}',
+ False)
+ self.assertEqual(
+ (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ ('doc-here', 'replica:1', '{"value": "here"}', 1),
+ other_changes[0][:-1])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_send_ensure_callback(self):
+ """
+ Test for sync exchange failure and retry.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ remote_target = self.getSyncTarget()
+ other_docs = []
+ replica_uid_box = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_docs.append((doc.doc_id, doc.rev, doc.get_json()))
+
+ def ensure_cb(replica_uid):
+ replica_uid_box.append(replica_uid)
+
+ doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=receive_doc,
+ ensure_callback=ensure_cb, defer_decryption=False)
+ self.assertEqual(1, new_gen)
+ db = self.db2
+ self.assertEqual(1, len(replica_uid_box))
+ self.assertEqual(db._replica_uid, replica_uid_box[0])
+ self.assertGetEncryptedDoc(
+ db, 'doc-here', 'replica:1', '{"value": "here"}', False)
+
+ def test_sync_exchange_in_stream_error(self):
+ self.skipTest("bypass this test because our sync_exchange process "
+ "does not return u1db error 503 \"unavailable\" for "
+ "now")
+
+ @defer.inlineCallbacks
+ def test_get_sync_info(self):
+ db = self.db2
+ db._set_replica_gen_and_trans_id('other-id', 1, 'T-transid')
+ remote_target = self.getSyncTarget(
+ source_replica_uid='other-id')
+ sync_info = yield remote_target.get_sync_info('other-id')
+ self.assertEqual(
+ ('test', 0, '', 1, 'T-transid'),
+ sync_info)
+
+ @defer.inlineCallbacks
+ def test_record_sync_info(self):
+ remote_target = self.getSyncTarget(
+ source_replica_uid='other-id')
+ yield remote_target.record_sync_info('other-id', 2, 'T-transid')
+ self.assertEqual((2, 'T-transid'),
+ self.db2._get_replica_gen_and_trans_id('other-id'))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_receive(self):
+ db = self.db2
+ doc = db.create_doc_from_json('{"value": "there"}')
+ remote_target = self.getSyncTarget()
+ other_changes = []
+
+ def receive_doc(doc, gen, trans_id):
+ other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ new_gen, trans_id = yield remote_target.sync_exchange(
+ [], 'replica', last_known_generation=0, last_known_trans_id=None,
+ insert_doc_cb=receive_doc)
+ self.assertEqual(1, new_gen)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, '{"value": "there"}', 1),
+ other_changes[0][:-1])
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_sync`.
+# -----------------------------------------------------------------------------
+
+target_scenarios = [
+ ('mem,token_soledad',
+ {'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_soledad_app,
+ 'make_database_for_test': tests.make_memory_database_for_test,
+ 'copy_database_for_test': tests.copy_memory_database_for_test,
+ 'make_document_for_test': tests.make_document_for_test})
+]
+
+
+class SoledadDatabaseSyncTargetTests(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.DatabaseBaseTests,
+ tests.TestCaseWithServer):
+ """
+ Adaptation of u1db.tests.test_sync.DatabaseSyncTargetTests.
+ """
+
+ # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
+ # skipped tests can be succesfully executed.
+
+ scenarios = target_scenarios
+
+ whitebox = False
+
+ def setUp(self):
+ tests.TestCaseWithServer.setUp(self)
+ self.other_changes = []
+ SoledadWithCouchServerMixin.setUp(self)
+ self.db, self.st = make_local_db_and_soledad_target(self)
+
+ def tearDown(self):
+ self.db.close()
+ self.st.close()
+ tests.TestCaseWithServer.tearDown(self)
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def set_trace_hook(self, callback, shallow=False):
+ setter = (self.st._set_trace_hook if not shallow else
+ self.st._set_trace_hook_shallow)
+ try:
+ setter(callback)
+ except NotImplementedError:
+ self.skipTest("%s does not implement _set_trace_hook"
+ % (self.st.__class__.__name__,))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ docs_by_gen = [
+ (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10,
+ 'T-sid')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertTransactionLog(['doc-id'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, last_trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(10, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_push_many(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to decrypt remote content before assert.
+ """
+ docs_by_gen = [
+ (self.make_document(
+ 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'),
+ (self.make_document(
+ 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db, 'doc-id2', 'replica:1', tests.nested_doc, False)
+ self.assertTransactionLog(['doc-id', 'doc-id2'], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(11, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_many_new_docs(self):
+ """
+ Test sync exchange.
+
+ This test was adapted to avoid JSON serialization comparison as local
+ and remote representations might differ. It looks directly at the
+ doc's contents instead.
+ """
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db.create_doc_from_json(tests.nested_doc)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc,
+ defer_decryption=False)
+ self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
+ self.assertEqual(2, new_gen)
+ self.assertEqual(
+ [(doc.doc_id, doc.rev, 1),
+ (doc2.doc_id, doc2.rev, 2)],
+ [c[:-3] + c[-2:-1] for c in self.other_changes])
+ self.assertEqual(
+ json.loads(tests.simple_doc),
+ json.loads(self.other_changes[0][2]))
+ self.assertEqual(
+ json.loads(tests.nested_doc),
+ json.loads(self.other_changes[1][2]))
+ if self.whitebox:
+ self.assertEqual(
+ self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs':
+ [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]})
+
+ def receive_doc(self, doc, gen, trans_id):
+ self.other_changes.append(
+ (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id))
+
+ def test_get_sync_target(self):
+ self.assertIsNot(None, self.st)
+
+ @defer.inlineCallbacks
+ def test_get_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+
+ @defer.inlineCallbacks
+ def test_create_doc_updates_sync_info(self):
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(
+ ('test', 0, '', 0, ''), sync_info)
+ self.db.create_doc_from_json(tests.simple_doc)
+ sync_info = yield self.st.get_sync_info('other')
+ self.assertEqual(1, sync_info[1])
+
+ @defer.inlineCallbacks
+ def test_record_sync_info(self):
+ yield self.st.record_sync_info('replica', 10, 'T-transid')
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(
+ ('test', 0, '', 10, 'T-transid'), sync_info)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_deleted(self):
+ doc = self.db.create_doc_from_json('{}')
+ edit_rev = 'replica:1|' + doc.rev
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')]
+ new_gen, trans_id = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertGetDocIncludeDeleted(
+ self.db, doc.doc_id, edit_rev, None, False)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 2, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ sync_info = yield self.st.get_sync_info('replica')
+ self.assertEqual(10, sync_info[3])
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_refuses_conflicts(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'replica:1', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1),
+ self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_ignores_convergence(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ gen, txid = self.db._get_generation_info()
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, doc.rev, tests.simple_doc),
+ 10, 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'replica', last_known_generation=gen,
+ last_known_trans_id=txid, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(([], 1), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_new_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, tests.simple_doc, 1),
+ self.other_changes[0][:-1])
+ self.assertEqual(1, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 1, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_returns_deleted_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.db.delete_doc(doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ new_gen, _ = yield self.st.sync_exchange(
+ [], 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(
+ (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1])
+ self.assertEqual(2, new_gen)
+ if self.whitebox:
+ self.assertEqual(self.db._last_exchange_log['return'],
+ {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]})
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_getting_newer_docs(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates_of_synced_doc(self):
+ expected = []
+
+ def before_whatschanged_cb(state):
+ if state != 'before whats_changed':
+ return
+ cont = '{"key": "cuncurrent"}'
+ conc_rev = self.db.put_doc(
+ self.make_document(doc.doc_id, 'test:1|z:2', cont))
+ expected.append((doc.doc_id, conc_rev, cont, 3))
+
+ self.set_trace_hook(before_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(expected, [c[:-1] for c in self.other_changes])
+ self.assertEqual(3, new_gen)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_with_concurrent_updates(self):
+
+ def after_whatschanged_cb(state):
+ if state != 'after whats_changed':
+ return
+ self.db.create_doc_from_json('{"new": "doc"}')
+
+ self.set_trace_hook(after_whatschanged_cb)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ new_doc = '{"key": "altval"}'
+ docs_by_gen = [
+ (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10,
+ 'T-sid')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_converged_handling(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ docs_by_gen = [
+ (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'),
+ (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5,
+ 'T-bar')]
+ new_gen, _ = yield self.st.sync_exchange(
+ docs_by_gen, 'other-replica', last_known_generation=0,
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
+ self.assertEqual(([], 2), (self.other_changes, new_gen))
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_detect_incomplete_exchange(self):
+ def before_get_docs_explode(state):
+ if state != 'before get_docs':
+ return
+ raise l2db.errors.U1DBError("fail")
+ self.set_trace_hook(before_get_docs_explode)
+ # suppress traceback printing in the wsgiref server
+ # self.patch(simple_server.ServerHandler,
+ # 'log_exception', lambda h, exc_info: None)
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ self.assertRaises(
+ (l2db.errors.U1DBError, l2db.errors.BrokenSyncStream),
+ self.st.sync_exchange, [], 'other-replica',
+ last_known_generation=0, last_known_trans_id=None,
+ insert_doc_cb=self.receive_doc)
+
+ @defer.inlineCallbacks
+ def test_sync_exchange_doc_ids(self):
+ sync_exchange_doc_ids = getattr(self.st, 'sync_exchange_doc_ids', None)
+ if sync_exchange_doc_ids is None:
+ self.skipTest("sync_exchange_doc_ids not implemented")
+ db2 = self.create_database('test2')
+ doc = db2.create_doc_from_json(tests.simple_doc)
+ new_gen, trans_id = yield sync_exchange_doc_ids(
+ db2, [(doc.doc_id, 10, 'T-sid')], 0, None,
+ insert_doc_cb=self.receive_doc)
+ self.assertGetDoc(self.db, doc.doc_id, doc.rev,
+ tests.simple_doc, False)
+ self.assertTransactionLog([doc.doc_id], self.db)
+ last_trans_id = self.getLastTransId(self.db)
+ self.assertEqual(([], 1, last_trans_id),
+ (self.other_changes, new_gen, trans_id))
+ self.assertEqual(10, self.st.get_sync_info(db2._replica_uid)[3])
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook(self):
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb)
+ yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ yield self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ],
+ called)
+
+ @defer.inlineCallbacks
+ def test__set_trace_hook_shallow(self):
+ if (self.st._set_trace_hook_shallow == self.st._set_trace_hook or
+ self.st._set_trace_hook_shallow.im_func ==
+ target.SoledadHTTPSyncTarget._set_trace_hook_shallow.im_func):
+ # shallow same as full
+ expected = ['before whats_changed',
+ 'after whats_changed',
+ 'before get_docs',
+ 'record_sync_info',
+ ]
+ else:
+ expected = ['sync_exchange', 'record_sync_info']
+
+ called = []
+
+ def cb(state):
+ called.append(state)
+
+ self.set_trace_hook(cb, shallow=True)
+ yield self.st.sync_exchange([], 'replica', 0, None, self.receive_doc)
+ yield self.st.record_sync_info('replica', 0, 'T-sid')
+ self.assertEqual(expected, called)
+
+
+# Just to make clear how this test is different... :)
+DEFER_DECRYPTION = False
+
+WAIT_STEP = 1
+MAX_WAIT = 10
+DBPASS = "pass"
+
+
+class SyncTimeoutError(Exception):
+
+ """
+ Dummy exception to notify timeout during sync.
+ """
+ pass
+
+
+class TestSoledadDbSync(
+ TestWithScenarios,
+ SoledadWithCouchServerMixin,
+ tests.TestCaseWithServer):
+
+ """Test db.sync remote sync shortcut"""
+
+ scenarios = [
+ ('py-token-http', {
+ 'create_db_and_target': make_local_db_and_token_soledad_target,
+ 'make_app_with_state': make_token_soledad_app,
+ 'make_database_for_test': make_sqlcipher_database_for_test,
+ 'token': True
+ }),
+ ]
+
+ oauth = False
+ token = False
+
+ def setUp(self):
+ """
+ Need to explicitely invoke inicialization on all bases.
+ """
+ SoledadWithCouchServerMixin.setUp(self)
+ self.server = self.server_thread = None
+ self.startTwistedServer()
+ self.syncer = None
+
+ # config info
+ self.db1_file = os.path.join(self.tempdir, "db1.u1db")
+ os.unlink(self.db1_file)
+ self.db_pass = DBPASS
+ self.email = ADDRESS
+
+ # get a random prefix for each test, so we do not mess with
+ # concurrency during initialization and shutting down of
+ # each local db.
+ self.rand_prefix = ''.join(
+ map(lambda x: random.choice(string.ascii_letters), range(6)))
+
+ # open test dbs: db1 will be the local sqlcipher db (which
+ # instantiates a syncdb). We use the self._soledad instance that was
+ # already created on some setUp method.
+ import binascii
+ tohex = binascii.b2a_hex
+ key = tohex(self._soledad.secrets.get_local_storage_key())
+ sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())
+ dbpath = self._soledad._local_db_path
+
+ self.opts = SQLCipherOptions(
+ dbpath, key, is_raw_key=True, create=False,
+ defer_encryption=True, sync_db_key=sync_db_key)
+ self.db1 = SQLCipherDatabase(self.opts)
+
+ self.db2 = self.request_state._create_database(replica_uid='test')
+
+ def tearDown(self):
+ """
+ Need to explicitely invoke destruction on all bases.
+ """
+ dbsyncer = getattr(self, 'dbsyncer', None)
+ if dbsyncer:
+ dbsyncer.close()
+ self.db1.close()
+ self.db2.close()
+ self._soledad.close()
+
+ # XXX should not access "private" attrs
+ shutil.rmtree(os.path.dirname(self._soledad._local_db_path))
+ SoledadWithCouchServerMixin.tearDown(self)
+
+ def do_sync(self, target_name):
+ """
+ Perform sync using SoledadSynchronizer, SoledadSyncTarget
+ and Token auth.
+ """
+ if self.token:
+ creds = {'token': {
+ 'uuid': 'user-uuid',
+ 'token': 'auth-token',
+ }}
+ target_url = self.getURL(self.db2._dbname)
+
+ # get a u1db syncer
+ crypto = self._soledad._crypto
+ replica_uid = self.db1._replica_uid
+ dbsyncer = SQLCipherU1DBSync(
+ self.opts,
+ crypto,
+ replica_uid,
+ None,
+ defer_encryption=True)
+ self.dbsyncer = dbsyncer
+ return dbsyncer.sync(target_url,
+ creds=creds,
+ defer_decryption=DEFER_DECRYPTION)
+ else:
+ return self._do_sync(self, target_name)
+
+ def _do_sync(self, target_name):
+ if self.oauth:
+ path = '~/' + target_name
+ extra = dict(creds={'oauth': {
+ 'consumer_key': tests.consumer1.key,
+ 'consumer_secret': tests.consumer1.secret,
+ 'token_key': tests.token1.key,
+ 'token_secret': tests.token1.secret,
+ }})
+ else:
+ path = target_name
+ extra = {}
+ target_url = self.getURL(path)
+ return self.db.sync(target_url, **extra)
+
+ def wait_for_sync(self):
+ """
+ Wait for sync to finish.
+ """
+ wait = 0
+ syncer = self.syncer
+ if syncer is not None:
+ while syncer.syncing:
+ time.sleep(WAIT_STEP)
+ wait += WAIT_STEP
+ if wait >= MAX_WAIT:
+ raise SyncTimeoutError
+
+ def test_db_sync(self):
+ """
+ Test sync.
+
+ Adapted to check for encrypted content.
+ """
+ doc1 = self.db1.create_doc_from_json(tests.simple_doc)
+ doc2 = self.db2.create_doc_from_json(tests.nested_doc)
+ d = self.do_sync('test')
+
+ def _assert_successful_sync(results):
+ import time
+ # need to give time to the encryption to proceed
+ # TODO should implement a defer list to subscribe to the
+ # all-decrypted event
+ time.sleep(2)
+ local_gen_before_sync = results
+ self.wait_for_sync()
+
+ gen, _, changes = self.db1.whats_changed(local_gen_before_sync)
+ self.assertEqual(1, len(changes))
+
+ self.assertEqual(doc2.doc_id, changes[0][0])
+ self.assertEqual(1, gen - local_gen_before_sync)
+
+ self.assertGetEncryptedDoc(
+ self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False)
+ self.assertGetEncryptedDoc(
+ self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False)
+
+ d.addCallback(_assert_successful_sync)
+ return d
+
+
+class SQLCipherSyncTargetTests(SoledadDatabaseSyncTargetTests):
+
+ # TODO: implement _set_trace_hook(_shallow) in SoledadHTTPSyncTarget so
+ # skipped tests can be succesfully executed.
+
+ scenarios = (tests.multiply_scenarios(SQLCIPHER_SCENARIOS,
+ target_scenarios))
+
+ whitebox = False