diff options
author | drebs <drebs@leap.se> | 2013-01-08 17:25:06 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-01-08 17:25:06 -0200 |
commit | a59b34f37fb687de77d1a94f41f53a961baad348 (patch) | |
tree | b603966f0a265d8229b4fab15b91f8da35a4cf3f /src/leap/soledad/tests/test_sqlcipher.py | |
parent | 0253ee7ad92efbdb20819683b44d3a815096cb42 (diff) |
SQLCipherBackend passes all relevant u1db tests.
Diffstat (limited to 'src/leap/soledad/tests/test_sqlcipher.py')
-rw-r--r-- | src/leap/soledad/tests/test_sqlcipher.py | 181 |
1 files changed, 141 insertions, 40 deletions
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py index f203dc66..c712bc47 100644 --- a/src/leap/soledad/tests/test_sqlcipher.py +++ b/src/leap/soledad/tests/test_sqlcipher.py @@ -7,22 +7,120 @@ import unittest2 as unittest from sqlite3 import dbapi2 +# u1db stuff. from u1db import ( errors, query_parser, ) + +# soledad stuff. from leap.soledad.backends import sqlcipher as sqlite_backend +from leap.soledad.backends.sqlcipher import open as u1db_open from leap.soledad.backends.leap_backend import LeapDocument + +# u1db tests stuff. from leap.soledad.tests import u1db_tests as tests from leap.soledad.tests.u1db_tests.test_sqlite_backend import ( TestSQLiteDatabase, TestSQLitePartialExpandDatabase, ) -from leap.soledad.tests.u1db_tests.test_backends import TestAlternativeDocument -from leap.soledad.tests.u1db_tests.test_backends import AllDatabaseTests +from leap.soledad.tests.u1db_tests.test_backends import ( + TestAlternativeDocument, + AllDatabaseTests, + LocalDatabaseTests, + LocalDatabaseValidateGenNTransIdTests, + LocalDatabaseValidateSourceGenTests, + LocalDatabaseWithConflictsTests, + DatabaseIndexTests, +) +from leap.soledad.tests.u1db_tests.test_open import ( + TestU1DBOpen, +) PASSWORD = '123456' +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_common_backends`. +#----------------------------------------------------------------------------- + +class TestSQLCipherBackendImpl(tests.TestCase): + + def test__allocate_doc_id(self): + db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + doc_id1 = db._allocate_doc_id() + self.assertTrue(doc_id1.startswith('D-')) + self.assertEqual(34, len(doc_id1)) + int(doc_id1[len('D-'):], 16) + self.assertNotEqual(doc_id1, db._allocate_doc_id()) + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_sqlcipher_database_for_test(test, replica_uid): + db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + db._set_replica_uid(replica_uid) + return db + + +def copy_sqlcipher_database_for_test(test, db): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS + # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE + # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN + # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR + # HOUSE. + new_db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + tmpfile = StringIO() + for line in db._db_handle.iterdump(): + if not 'sqlite_sequence' in line: # work around bug in iterdump + tmpfile.write('%s\n' % line) + tmpfile.seek(0) + new_db._db_handle = dbapi2.connect(':memory:') + new_db._db_handle.cursor().executescript(tmpfile.read()) + new_db._db_handle.commit() + new_db._set_replica_uid(db._replica_uid) + new_db._factory = db._factory + return new_db + + +SQLCIPHER_SCENARIOS = [ + ('sqlcipher', {'make_database_for_test': make_sqlcipher_database_for_test, + 'copy_database_for_test': copy_sqlcipher_database_for_test, + 'make_document_for_test': tests.make_document_for_test,}), + ] + + +class SQLCipherTests(AllDatabaseTests): + scenarios = SQLCIPHER_SCENARIOS + + +class SQLCipherDatabaseTests(LocalDatabaseTests): + scenarios = SQLCIPHER_SCENARIOS + + +class SQLCipherValidateGenNTransIdTests(LocalDatabaseValidateGenNTransIdTests): + scenarios = SQLCIPHER_SCENARIOS + + +class SQLCipherValidateSourceGenTests(LocalDatabaseValidateSourceGenTests): + scenarios = SQLCIPHER_SCENARIOS + + +class SQLCipherWithConflictsTests(LocalDatabaseWithConflictsTests): + scenarios = SQLCIPHER_SCENARIOS + + +class SQLCipherIndexTests(DatabaseIndexTests): + scenarios = SQLCIPHER_SCENARIOS + + +load_tests = tests.load_with_scenarios + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_sqlite_backend`. +#----------------------------------------------------------------------------- class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): @@ -114,41 +212,44 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): 'index_storage': 'expand referenced encrypted'}, config) -# The following tests come from u1db test_backends. - -def make_sqlcipher_database_for_test(test, replica_uid): - db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) - db._set_replica_uid(replica_uid) - return db - - -def copy_sqlcipher_database_for_test(test, db): - # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS - # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE - # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN - # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR - # HOUSE. - new_db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) - tmpfile = StringIO() - for line in db._db_handle.iterdump(): - if not 'sqlite_sequence' in line: # work around bug in iterdump - tmpfile.write('%s\n' % line) - tmpfile.seek(0) - new_db._db_handle = dbapi2.connect(':memory:') - new_db._db_handle.cursor().executescript(tmpfile.read()) - new_db._db_handle.commit() - new_db._set_replica_uid(db._replica_uid) - new_db._factory = db._factory - return new_db - - -class SQLCipherTests(AllDatabaseTests): - - scenarios = [ - ('sqlcipher', {'make_database_for_test': make_sqlcipher_database_for_test, - 'copy_database_for_test': copy_sqlcipher_database_for_test, - 'make_document_for_test': tests.make_document_for_test,}), - ] - - -load_tests = tests.load_with_scenarios +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_open`. +#----------------------------------------------------------------------------- + +class SQLCipherOpen(TestU1DBOpen): + + def test_open_no_create(self): + self.assertRaises(errors.DatabaseDoesNotExist, + u1db_open, self.db_path, + password=PASSWORD, + create=False) + self.assertFalse(os.path.exists(self.db_path)) + + def test_open_create(self): + db = u1db_open(self.db_path, password=PASSWORD, create=True) + self.addCleanup(db.close) + self.assertTrue(os.path.exists(self.db_path)) + self.assertIsInstance(db, sqlite_backend.SQLCipherDatabase) + + def test_open_with_factory(self): + db = u1db_open(self.db_path, password=PASSWORD, create=True, + document_factory=TestAlternativeDocument) + self.addCleanup(db.close) + self.assertEqual(TestAlternativeDocument, db._factory) + + def test_open_existing(self): + db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) + self.addCleanup(db.close) + doc = db.create_doc_from_json(tests.simple_doc) + # Even though create=True, we shouldn't wipe the db + db2 = u1db_open(self.db_path, password=PASSWORD, create=True) + self.addCleanup(db2.close) + doc2 = db2.get_doc(doc.doc_id) + self.assertEqual(doc, doc2) + + def test_open_existing_no_create(self): + db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) + self.addCleanup(db.close) + db2 = u1db_open(self.db_path, password=PASSWORD, create=False) + self.addCleanup(db2.close) + self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) |