summaryrefslogtreecommitdiff
path: root/src/leap/soledad/tests/test_sqlcipher.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-13 13:46:27 -0200
committerdrebs <drebs@leap.se>2012-12-13 13:46:27 -0200
commitece9f7c2116fa961cafabcc6a5790206412c95ae (patch)
treec94025ad27bad33ac7aa10bcd90caeeb7e40ff5f /src/leap/soledad/tests/test_sqlcipher.py
parent19ee861b5c5dca236800ffcb944b4299561d841d (diff)
Enforce password on SQLCipher backend.
Diffstat (limited to 'src/leap/soledad/tests/test_sqlcipher.py')
-rw-r--r--src/leap/soledad/tests/test_sqlcipher.py79
1 files changed, 42 insertions, 37 deletions
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py
index e35a6d90..f9e9f681 100644
--- a/src/leap/soledad/tests/test_sqlcipher.py
+++ b/src/leap/soledad/tests/test_sqlcipher.py
@@ -36,7 +36,7 @@ simple_doc = '{"key": "value"}'
nested_doc = '{"key": "value", "sub": {"doc": "underneath"}}'
-class TestSQLiteDatabase(tests.TestCase):
+class TestSQLCipherDatabase(tests.TestCase):
def test_atomic_initialize(self):
tmpdir = self.createTempDir()
@@ -44,16 +44,17 @@ class TestSQLiteDatabase(tests.TestCase):
t2 = None # will be a thread
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
_index_storage_value = "testing"
def __init__(self, dbname, ntry):
self._try = ntry
self._is_initialized_invocations = 0
- super(SQLiteDatabaseTesting, self).__init__(dbname)
+ password = '123456'
+ super(SQLCipherDatabaseTesting, self).__init__(dbname, password)
def _is_initialized(self, c):
- res = super(SQLiteDatabaseTesting, self)._is_initialized(c)
+ res = super(SQLCipherDatabaseTesting, self)._is_initialized(c)
if self._try == 1:
self._is_initialized_invocations += 1
if self._is_initialized_invocations == 2:
@@ -66,26 +67,29 @@ class TestSQLiteDatabase(tests.TestCase):
def second_try():
try:
- db2 = SQLiteDatabaseTesting(dbname, 2)
+ db2 = SQLCipherDatabaseTesting(dbname, 2)
except Exception, e:
outcome2.append(e)
else:
outcome2.append(db2)
t2 = threading.Thread(target=second_try)
- db1 = SQLiteDatabaseTesting(dbname, 1)
+ db1 = SQLCipherDatabaseTesting(dbname, 1)
t2.join()
- self.assertIsInstance(outcome2[0], SQLiteDatabaseTesting)
+ self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting)
db2 = outcome2[0]
self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor()))
-class TestSQLitePartialExpandDatabase(tests.TestCase):
+_password = '123456'
+
+
+class TestSQLCipherPartialExpandDatabase(tests.TestCase):
def setUp(self):
- super(TestSQLitePartialExpandDatabase, self).setUp()
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ super(TestSQLCipherPartialExpandDatabase, self).setUp()
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.db._set_replica_uid('test')
def test_create_database(self):
@@ -93,7 +97,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
self.assertNotEqual(None, raw_db)
def test_default_replica_uid(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.assertIsNot(None, self.db._replica_uid)
self.assertEqual(32, len(self.db._replica_uid))
int(self.db._replica_uid, 16)
@@ -121,13 +125,13 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
c.execute("SELECT * FROM index_definitions")
def test__parse_index(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
g = self.db._parse_index_definition('fieldname')
self.assertIsInstance(g, query_parser.ExtractField)
self.assertEqual(['fieldname'], g.field)
def test__update_indexes(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
g = self.db._parse_index_definition('fieldname')
c = self.db._get_sqlite_handle().cursor()
self.db._update_indexes('doc-id', {'fieldname': 'val'},
@@ -138,7 +142,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
def test__set_replica_uid(self):
# Start from scratch, so that replica_uid isn't set.
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.assertIsNot(None, self.db._real_replica_uid)
self.assertIsNot(None, self.db._replica_uid)
self.db._set_replica_uid('foo')
@@ -239,16 +243,16 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/rollback.db'
- class SQLitePartialExpandDbTesting(
+ class SQLCipherPartialExpandDbTesting(
sqlcipher.SQLCipherDatabase):
def _set_replica_uid_in_transaction(self, uid):
- super(SQLitePartialExpandDbTesting,
+ super(SQLCipherPartialExpandDbTesting,
self)._set_replica_uid_in_transaction(uid)
if fail:
raise Exception()
- db = SQLitePartialExpandDbTesting.__new__(SQLitePartialExpandDbTesting)
+ db = SQLCipherPartialExpandDbTesting.__new__(SQLCipherPartialExpandDbTesting)
db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed
fail = True
self.assertRaises(Exception, db._ensure_schema)
@@ -258,23 +262,23 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
def test__open_database(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlcipher.SQLCipherDatabase(path)
- db2 = sqlcipher.SQLCipherDatabase._open_database(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
+ db2 = sqlcipher.SQLCipherDatabase._open_database(path, _password)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test__open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlcipher.SQLCipherDatabase(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
db2 = sqlcipher.SQLCipherDatabase._open_database(
- path, document_factory=LeapDocument)
+ path, _password, document_factory=LeapDocument)
self.assertEqual(LeapDocument, db2._factory)
def test__open_database_non_existent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/non-existent.sqlite'
self.assertRaises(errors.DatabaseDoesNotExist,
- sqlcipher.SQLCipherDatabase._open_database, path)
+ sqlcipher.SQLCipherDatabase._open_database, path, _password)
def test__open_database_during_init(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
@@ -285,17 +289,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
self.addCleanup(db.close)
observed = []
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1
@classmethod
def _which_index_storage(cls, c):
- res = super(SQLiteDatabaseTesting, cls)._which_index_storage(c)
+ res = super(SQLCipherDatabaseTesting, cls)._which_index_storage(c)
db._ensure_schema() # init db
observed.append(res[0])
return res
- db2 = SQLiteDatabaseTesting._open_database(path)
+ db2 = SQLCipherDatabaseTesting._open_database(path, _password)
self.addCleanup(db2.close)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
self.assertEqual([None,
@@ -303,39 +307,40 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
observed)
def test__open_database_invalid(self):
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1
temp_dir = self.createTempDir(prefix='u1db-test-')
path1 = temp_dir + '/invalid1.db'
with open(path1, 'wb') as f:
f.write("")
self.assertRaises(dbapi2.OperationalError,
- SQLiteDatabaseTesting._open_database, path1)
+ SQLCipherDatabaseTesting._open_database, path1, _password)
with open(path1, 'wb') as f:
f.write("invalid")
self.assertRaises(dbapi2.DatabaseError,
- SQLiteDatabaseTesting._open_database, path1)
+ SQLCipherDatabaseTesting._open_database, path1, _password)
def test_open_database_existing(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlcipher.SQLCipherDatabase(path)
- db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False)
+ sqlcipher.SQLCipherDatabase(path, _password)
+ db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password,
+ create=False)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test_open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlcipher.SQLCipherDatabase(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
db2 = sqlcipher.SQLCipherDatabase.open_database(
- path, create=False, document_factory=LeapDocument)
+ path, _password, create=False, document_factory=LeapDocument)
self.assertEqual(LeapDocument, db2._factory)
def test_open_database_create(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/new.sqlite'
- sqlcipher.SQLCipherDatabase.open_database(path, create=True)
- db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False)
+ sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True)
+ db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=False)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test_open_database_non_existent(self):
@@ -343,17 +348,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
path = temp_dir + '/non-existent.sqlite'
self.assertRaises(errors.DatabaseDoesNotExist,
sqlcipher.SQLCipherDatabase.open_database, path,
- create=False)
+ _password, create=False)
def test_delete_database_existent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/new.sqlite'
- db = sqlcipher.SQLCipherDatabase.open_database(path, create=True)
+ db = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True)
db.close()
sqlcipher.SQLCipherDatabase.delete_database(path)
self.assertRaises(errors.DatabaseDoesNotExist,
sqlcipher.SQLCipherDatabase.open_database, path,
- create=False)
+ _password, create=False)
def test_delete_database_nonexistent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')