diff options
-rw-r--r-- | testing/tests/sync/test_sync_deferred.py | 182 |
1 files changed, 0 insertions, 182 deletions
diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py deleted file mode 100644 index 001612a6..00000000 --- a/testing/tests/sync/test_sync_deferred.py +++ /dev/null @@ -1,182 +0,0 @@ -# test_sync_deferred.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test Leap backend bits: sync with deferred encryption. -""" -import time -import os -import random -import string -import shutil - -from twisted.internet import defer - -from leap.soledad.client import sync -from leap.soledad.client.sqlcipher import SQLCipherOptions -from leap.soledad.client.sqlcipher import SQLCipherDatabase - -from testscenarios import TestWithScenarios - -from test_soledad import u1db_tests as tests -from test_soledad.util import ADDRESS -from test_soledad.util import SoledadWithCouchServerMixin -from test_soledad.util import make_soledad_app -from test_soledad.util import soledad_sync_target - - -WAIT_STEP = 1 -MAX_WAIT = 10 -DBPASS = "pass" - - -class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): - - """ - Another base class for testing the deferred encryption during - the syncs, using the intermediate database. - """ - defer_sync_encryption = True - - def setUp(self): - SoledadWithCouchServerMixin.setUp(self) - self.startTwistedServer() - # config info - self.db1_file = os.path.join(self.tempdir, "db1.u1db") - os.unlink(self.db1_file) - self.db_pass = DBPASS - self.email = ADDRESS - - # get a random prefix for each test, so we do not mess with - # concurrency during initialization and shutting down of - # each local db. - self.rand_prefix = ''.join( - map(lambda x: random.choice(string.ascii_letters), range(6))) - - # open test dbs: db1 will be the local sqlcipher db (which - # instantiates a syncdb). We use the self._soledad instance that was - # already created on some setUp method. - import binascii - tohex = binascii.b2a_hex - key = tohex(self._soledad.secrets.get_local_storage_key()) - dbpath = self._soledad._local_db_path - - self.opts = SQLCipherOptions( - dbpath, key, is_raw_key=True, create=False) - self.db1 = SQLCipherDatabase(self.opts) - - self.db2 = self.request_state._create_database('test') - - def tearDown(self): - # XXX should not access "private" attrs - shutil.rmtree(os.path.dirname(self._soledad._local_db_path)) - SoledadWithCouchServerMixin.tearDown(self) - - -class SyncTimeoutError(Exception): - - """ - Dummy exception to notify timeout during sync. - """ - pass - - -class TestSoledadDbSyncDeferredEncDecr( - TestWithScenarios, - BaseSoledadDeferredEncTest, - tests.TestCaseWithServer): - - """ - Test db.sync remote sync shortcut. - Case with deferred encryption: using the intermediate - syncdb. - """ - - scenarios = [ - ('http', { - 'make_app_with_state': make_soledad_app, - 'make_database_for_test': tests.make_memory_database_for_test, - }), - ] - - oauth = False - token = True - - def setUp(self): - """ - Need to explicitely invoke inicialization on all bases. - """ - BaseSoledadDeferredEncTest.setUp(self) - self.server = self.server_thread = None - self.syncer = None - - def tearDown(self): - """ - Need to explicitely invoke destruction on all bases. - """ - dbsyncer = getattr(self, 'dbsyncer', None) - if dbsyncer: - dbsyncer.close() - BaseSoledadDeferredEncTest.tearDown(self) - - def do_sync(self): - """ - Perform sync using SoledadSynchronizer, SoledadSyncTarget - and Token auth. - """ - replica_uid = self._soledad._dbpool.replica_uid - dbsyncer = self._soledad._dbsyncer # Soledad.sync uses the dbsyncer - - target = soledad_sync_target( - self, self.db2._dbname, - source_replica_uid=replica_uid) - return sync.SoledadSynchronizer( - dbsyncer, - target).sync() - - def wait_for_sync(self): - """ - Wait for sync to finish. - """ - wait = 0 - syncer = self.syncer - if syncer is not None: - while syncer.syncing: - time.sleep(WAIT_STEP) - wait += WAIT_STEP - if wait >= MAX_WAIT: - raise SyncTimeoutError - - @defer.inlineCallbacks - def test_db_sync(self): - """ - Test sync. - - Adapted to check for encrypted content. - """ - doc1 = self.db1.create_doc_from_json(tests.simple_doc) - doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = yield self.do_sync() - - gen, _, changes = self.db1.whats_changed(local_gen_before_sync) - self.assertEqual(1, len(changes)) - - self.assertEqual(doc2.doc_id, changes[0][0]) - self.assertEqual(1, gen - local_gen_before_sync) - - self.assertGetEncryptedDoc( - self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) - self.assertGetEncryptedDoc( - self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False) |