From a6c935968310ff643d02198e4017d6aba0697e18 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 13 Dec 2012 13:46:27 -0200 Subject: Enforce password on SQLCipher backend. --- tests/test_sqlcipher.py | 79 ++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 37 deletions(-) (limited to 'tests') diff --git a/tests/test_sqlcipher.py b/tests/test_sqlcipher.py index e35a6d90..f9e9f681 100644 --- a/tests/test_sqlcipher.py +++ b/tests/test_sqlcipher.py @@ -36,7 +36,7 @@ simple_doc = '{"key": "value"}' nested_doc = '{"key": "value", "sub": {"doc": "underneath"}}' -class TestSQLiteDatabase(tests.TestCase): +class TestSQLCipherDatabase(tests.TestCase): def test_atomic_initialize(self): tmpdir = self.createTempDir() @@ -44,16 +44,17 @@ class TestSQLiteDatabase(tests.TestCase): t2 = None # will be a thread - class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase): + class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase): _index_storage_value = "testing" def __init__(self, dbname, ntry): self._try = ntry self._is_initialized_invocations = 0 - super(SQLiteDatabaseTesting, self).__init__(dbname) + password = '123456' + super(SQLCipherDatabaseTesting, self).__init__(dbname, password) def _is_initialized(self, c): - res = super(SQLiteDatabaseTesting, self)._is_initialized(c) + res = super(SQLCipherDatabaseTesting, self)._is_initialized(c) if self._try == 1: self._is_initialized_invocations += 1 if self._is_initialized_invocations == 2: @@ -66,26 +67,29 @@ class TestSQLiteDatabase(tests.TestCase): def second_try(): try: - db2 = SQLiteDatabaseTesting(dbname, 2) + db2 = SQLCipherDatabaseTesting(dbname, 2) except Exception, e: outcome2.append(e) else: outcome2.append(db2) t2 = threading.Thread(target=second_try) - db1 = SQLiteDatabaseTesting(dbname, 1) + db1 = SQLCipherDatabaseTesting(dbname, 1) t2.join() - self.assertIsInstance(outcome2[0], SQLiteDatabaseTesting) + self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting) db2 = outcome2[0] self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor())) -class TestSQLitePartialExpandDatabase(tests.TestCase): +_password = '123456' + + +class TestSQLCipherPartialExpandDatabase(tests.TestCase): def setUp(self): - super(TestSQLitePartialExpandDatabase, self).setUp() - self.db = sqlcipher.SQLCipherDatabase(':memory:') + super(TestSQLCipherPartialExpandDatabase, self).setUp() + self.db = sqlcipher.SQLCipherDatabase(':memory:', _password) self.db._set_replica_uid('test') def test_create_database(self): @@ -93,7 +97,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): self.assertNotEqual(None, raw_db) def test_default_replica_uid(self): - self.db = sqlcipher.SQLCipherDatabase(':memory:') + self.db = sqlcipher.SQLCipherDatabase(':memory:', _password) self.assertIsNot(None, self.db._replica_uid) self.assertEqual(32, len(self.db._replica_uid)) int(self.db._replica_uid, 16) @@ -121,13 +125,13 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): c.execute("SELECT * FROM index_definitions") def test__parse_index(self): - self.db = sqlcipher.SQLCipherDatabase(':memory:') + self.db = sqlcipher.SQLCipherDatabase(':memory:', _password) g = self.db._parse_index_definition('fieldname') self.assertIsInstance(g, query_parser.ExtractField) self.assertEqual(['fieldname'], g.field) def test__update_indexes(self): - self.db = sqlcipher.SQLCipherDatabase(':memory:') + self.db = sqlcipher.SQLCipherDatabase(':memory:', _password) g = self.db._parse_index_definition('fieldname') c = self.db._get_sqlite_handle().cursor() self.db._update_indexes('doc-id', {'fieldname': 'val'}, @@ -138,7 +142,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): def test__set_replica_uid(self): # Start from scratch, so that replica_uid isn't set. - self.db = sqlcipher.SQLCipherDatabase(':memory:') + self.db = sqlcipher.SQLCipherDatabase(':memory:', _password) self.assertIsNot(None, self.db._real_replica_uid) self.assertIsNot(None, self.db._replica_uid) self.db._set_replica_uid('foo') @@ -239,16 +243,16 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/rollback.db' - class SQLitePartialExpandDbTesting( + class SQLCipherPartialExpandDbTesting( sqlcipher.SQLCipherDatabase): def _set_replica_uid_in_transaction(self, uid): - super(SQLitePartialExpandDbTesting, + super(SQLCipherPartialExpandDbTesting, self)._set_replica_uid_in_transaction(uid) if fail: raise Exception() - db = SQLitePartialExpandDbTesting.__new__(SQLitePartialExpandDbTesting) + db = SQLCipherPartialExpandDbTesting.__new__(SQLCipherPartialExpandDbTesting) db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed fail = True self.assertRaises(Exception, db._ensure_schema) @@ -258,23 +262,23 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): def test__open_database(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/test.sqlite' - sqlcipher.SQLCipherDatabase(path) - db2 = sqlcipher.SQLCipherDatabase._open_database(path) + sqlcipher.SQLCipherDatabase(path, _password) + db2 = sqlcipher.SQLCipherDatabase._open_database(path, _password) self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase) def test__open_database_with_factory(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/test.sqlite' - sqlcipher.SQLCipherDatabase(path) + sqlcipher.SQLCipherDatabase(path, _password) db2 = sqlcipher.SQLCipherDatabase._open_database( - path, document_factory=LeapDocument) + path, _password, document_factory=LeapDocument) self.assertEqual(LeapDocument, db2._factory) def test__open_database_non_existent(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/non-existent.sqlite' self.assertRaises(errors.DatabaseDoesNotExist, - sqlcipher.SQLCipherDatabase._open_database, path) + sqlcipher.SQLCipherDatabase._open_database, path, _password) def test__open_database_during_init(self): temp_dir = self.createTempDir(prefix='u1db-test-') @@ -285,17 +289,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): self.addCleanup(db.close) observed = [] - class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase): + class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase): WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1 @classmethod def _which_index_storage(cls, c): - res = super(SQLiteDatabaseTesting, cls)._which_index_storage(c) + res = super(SQLCipherDatabaseTesting, cls)._which_index_storage(c) db._ensure_schema() # init db observed.append(res[0]) return res - db2 = SQLiteDatabaseTesting._open_database(path) + db2 = SQLCipherDatabaseTesting._open_database(path, _password) self.addCleanup(db2.close) self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase) self.assertEqual([None, @@ -303,39 +307,40 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): observed) def test__open_database_invalid(self): - class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase): + class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase): WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1 temp_dir = self.createTempDir(prefix='u1db-test-') path1 = temp_dir + '/invalid1.db' with open(path1, 'wb') as f: f.write("") self.assertRaises(dbapi2.OperationalError, - SQLiteDatabaseTesting._open_database, path1) + SQLCipherDatabaseTesting._open_database, path1, _password) with open(path1, 'wb') as f: f.write("invalid") self.assertRaises(dbapi2.DatabaseError, - SQLiteDatabaseTesting._open_database, path1) + SQLCipherDatabaseTesting._open_database, path1, _password) def test_open_database_existing(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/existing.sqlite' - sqlcipher.SQLCipherDatabase(path) - db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False) + sqlcipher.SQLCipherDatabase(path, _password) + db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password, + create=False) self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase) def test_open_database_with_factory(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/existing.sqlite' - sqlcipher.SQLCipherDatabase(path) + sqlcipher.SQLCipherDatabase(path, _password) db2 = sqlcipher.SQLCipherDatabase.open_database( - path, create=False, document_factory=LeapDocument) + path, _password, create=False, document_factory=LeapDocument) self.assertEqual(LeapDocument, db2._factory) def test_open_database_create(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/new.sqlite' - sqlcipher.SQLCipherDatabase.open_database(path, create=True) - db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False) + sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True) + db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=False) self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase) def test_open_database_non_existent(self): @@ -343,17 +348,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase): path = temp_dir + '/non-existent.sqlite' self.assertRaises(errors.DatabaseDoesNotExist, sqlcipher.SQLCipherDatabase.open_database, path, - create=False) + _password, create=False) def test_delete_database_existent(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/new.sqlite' - db = sqlcipher.SQLCipherDatabase.open_database(path, create=True) + db = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True) db.close() sqlcipher.SQLCipherDatabase.delete_database(path) self.assertRaises(errors.DatabaseDoesNotExist, sqlcipher.SQLCipherDatabase.open_database, path, - create=False) + _password, create=False) def test_delete_database_nonexistent(self): temp_dir = self.createTempDir(prefix='u1db-test-') -- cgit v1.2.3