summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/tests/test_sqlcipher.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/leap/soledad/common/tests/test_sqlcipher.py')
-rw-r--r--common/src/leap/soledad/common/tests/test_sqlcipher.py67
1 files changed, 50 insertions, 17 deletions
diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher.py b/common/src/leap/soledad/common/tests/test_sqlcipher.py
index 595966ec..273ac06e 100644
--- a/common/src/leap/soledad/common/tests/test_sqlcipher.py
+++ b/common/src/leap/soledad/common/tests/test_sqlcipher.py
@@ -24,8 +24,6 @@ import threading
from pysqlcipher import dbapi2
-from StringIO import StringIO
-from urlparse import urljoin
# u1db stuff.
@@ -79,6 +77,7 @@ class TestSQLCipherBackendImpl(tests.TestCase):
self.assertEqual(34, len(doc_id1))
int(doc_id1[len('D-'):], 16)
self.assertNotEqual(doc_id1, db._allocate_doc_id())
+ db.close()
#-----------------------------------------------------------------------------
@@ -123,9 +122,6 @@ class SQLCipherIndexTests(test_backends.DatabaseIndexTests):
scenarios = SQLCIPHER_SCENARIOS
-load_tests = tests.load_with_scenarios
-
-
#-----------------------------------------------------------------------------
# The following tests come from `u1db.tests.test_sqlite_backend`.
#-----------------------------------------------------------------------------
@@ -174,6 +170,8 @@ class TestSQLCipherDatabase(test_sqlite_backend.TestSQLiteDatabase):
self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting)
db2 = outcome2[0]
self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor()))
+ db1.close()
+ db2.close()
class TestAlternativeDocument(SoledadDocument):
@@ -190,22 +188,22 @@ class TestSQLCipherPartialExpandDatabase(
def setUp(self):
test_sqlite_backend.TestSQLitePartialExpandDatabase.setUp(self)
self.db = SQLCipherDatabase(':memory:', PASSWORD)
- self.db._set_replica_uid('test')
+
+ def tearDown(self):
+ self.db.close()
+ test_sqlite_backend.TestSQLitePartialExpandDatabase.tearDown(self)
def test_default_replica_uid(self):
- self.db = SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._replica_uid)
self.assertEqual(32, len(self.db._replica_uid))
int(self.db._replica_uid, 16)
def test__parse_index(self):
- self.db = SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
self.assertIsInstance(g, query_parser.ExtractField)
self.assertEqual(['fieldname'], g.field)
def test__update_indexes(self):
- self.db = SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
c = self.db._get_sqlite_handle().cursor()
self.db._update_indexes('doc-id', {'fieldname': 'val'},
@@ -216,7 +214,6 @@ class TestSQLCipherPartialExpandDatabase(
def test__set_replica_uid(self):
# Start from scratch, so that replica_uid isn't set.
- self.db = SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._real_replica_uid)
self.assertIsNot(None, self.db._replica_uid)
self.db._set_replica_uid('foo')
@@ -231,19 +228,23 @@ class TestSQLCipherPartialExpandDatabase(
def test__open_database(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- SQLCipherDatabase(path, PASSWORD)
+ db1 = SQLCipherDatabase(path, PASSWORD)
db2 = SQLCipherDatabase._open_database(path, PASSWORD)
self.assertIsInstance(db2, SQLCipherDatabase)
+ db1.close()
+ db2.close()
def test__open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- SQLCipherDatabase(path, PASSWORD)
+ db1 = SQLCipherDatabase(path, PASSWORD)
db2 = SQLCipherDatabase._open_database(
path, PASSWORD,
document_factory=TestAlternativeDocument)
doc = db2.create_doc({})
self.assertTrue(isinstance(doc, SoledadDocument))
+ db1.close()
+ db2.close()
def test__open_database_non_existent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
@@ -258,7 +259,9 @@ class TestSQLCipherPartialExpandDatabase(
db = SQLCipherDatabase.__new__(
SQLCipherDatabase)
db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed
+ db._sync_db = None
db._syncers = {}
+ db.sync_queue = None
c = db._db_handle.cursor()
c.execute('PRAGMA key="%s"' % PASSWORD)
self.addCleanup(db.close)
@@ -281,6 +284,8 @@ class TestSQLCipherPartialExpandDatabase(
[None,
SQLCipherDatabase._index_storage_value],
observed)
+ db.close()
+ db2.close()
def test__open_database_invalid(self):
class SQLiteDatabaseTesting(SQLCipherDatabase):
@@ -301,26 +306,32 @@ class TestSQLCipherPartialExpandDatabase(
def test_open_database_existing(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- SQLCipherDatabase(path, PASSWORD)
+ db1 = SQLCipherDatabase(path, PASSWORD)
db2 = SQLCipherDatabase.open_database(path, PASSWORD, create=False)
self.assertIsInstance(db2, SQLCipherDatabase)
+ db1.close()
+ db2.close()
def test_open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- SQLCipherDatabase(path, PASSWORD)
+ db1 = SQLCipherDatabase(path, PASSWORD)
db2 = SQLCipherDatabase.open_database(
path, PASSWORD, create=False,
document_factory=TestAlternativeDocument)
doc = db2.create_doc({})
self.assertTrue(isinstance(doc, SoledadDocument))
+ db1.close()
+ db2.close()
def test_open_database_create(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/new.sqlite'
- SQLCipherDatabase.open_database(path, PASSWORD, create=True)
+ db1 = SQLCipherDatabase.open_database(path, PASSWORD, create=True)
db2 = SQLCipherDatabase.open_database(path, PASSWORD, create=False)
self.assertIsInstance(db2, SQLCipherDatabase)
+ db1.close()
+ db2.close()
def test_create_database_initializes_schema(self):
# This test had to be cloned because our implementation of SQLCipher
@@ -331,7 +342,8 @@ class TestSQLCipherPartialExpandDatabase(
c = raw_db.cursor()
c.execute("SELECT * FROM u1db_config")
config = dict([(r[0], r[1]) for r in c.fetchall()])
- self.assertEqual({'sql_schema': '0', 'replica_uid': 'test',
+ replica_uid = self.db._replica_uid
+ self.assertEqual({'sql_schema': '0', 'replica_uid': replica_uid,
'index_storage': 'expand referenced encrypted'},
config)
@@ -444,6 +456,22 @@ class SQLCipherDatabaseSyncTests(
def tearDown(self):
test_sync.DatabaseSyncTests.tearDown(self)
+ if hasattr(self, 'db1') and isinstance(self.db1, SQLCipherDatabase):
+ self.db1.close()
+ if hasattr(self, 'db1_copy') \
+ and isinstance(self.db1_copy, SQLCipherDatabase):
+ self.db1_copy.close()
+ if hasattr(self, 'db2') \
+ and isinstance(self.db2, SQLCipherDatabase):
+ self.db2.close()
+ if hasattr(self, 'db2_copy') \
+ and isinstance(self.db2_copy, SQLCipherDatabase):
+ self.db2_copy.close()
+ if hasattr(self, 'db3') \
+ and isinstance(self.db3, SQLCipherDatabase):
+ self.db3.close()
+
+
def test_sync_autoresolves(self):
"""
@@ -612,6 +640,9 @@ class SQLCipherDatabaseSyncTests(
doc3.doc_id, doc3.rev, key, secret))
self.assertEqual(doc4.get_json(), doc3.get_json())
self.assertFalse(doc3.has_conflicts)
+ self.db1.close()
+ self.db2.close()
+ db3.close()
def test_sync_puts_changes(self):
"""
@@ -778,6 +809,7 @@ class SQLCipherEncryptionTest(BaseLeapTest):
doc = db.get_doc(doc.doc_id)
self.assertEqual(tests.simple_doc, doc.get_json(),
'decrypted content mismatch')
+ db.close()
def test_try_to_open_raw_db_with_sqlcipher_backend(self):
"""
@@ -790,7 +822,8 @@ class SQLCipherEncryptionTest(BaseLeapTest):
try:
# trying to open the a non-encrypted database with sqlcipher
# backend should raise a DatabaseIsNotEncrypted exception.
- SQLCipherDatabase(self.DB_FILE, PASSWORD)
+ db = SQLCipherDatabase(self.DB_FILE, PASSWORD)
+ db.close()
raise dbapi2.DatabaseError(
"SQLCipher backend should not be able to open non-encrypted "
"dbs.")