summaryrefslogtreecommitdiff
path: root/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/soledad')
-rw-r--r--src/leap/soledad/backends/sqlcipher.py27
-rw-r--r--src/leap/soledad/tests/test_sqlcipher.py79
2 files changed, 54 insertions, 52 deletions
diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py
index 301d4a7f..6fd6e619 100644
--- a/src/leap/soledad/backends/sqlcipher.py
+++ b/src/leap/soledad/backends/sqlcipher.py
@@ -54,7 +54,7 @@ def open(path, create, document_factory=None, password=None):
"""
from u1db.backends import sqlite_backend
return sqlite_backend.SQLCipherDatabase.open_database(
- path, create=create, document_factory=document_factory, password=password)
+ path, password, create=create, document_factory=document_factory)
class SQLCipherDatabase(SQLitePartialExpandDatabase):
@@ -67,17 +67,16 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):
def set_pragma_key(cls, db_handle, key):
db_handle.cursor().execute("PRAGMA key = '%s'" % key)
- def __init__(self, sqlite_file, document_factory=None, password=None):
+ def __init__(self, sqlite_file, password, document_factory=None):
"""Create a new sqlite file."""
self._db_handle = dbapi2.connect(sqlite_file)
- if password:
- SQLiteDatabase.set_pragma_key(self._db_handle, password)
+ SQLCipherDatabase.set_pragma_key(self._db_handle, password)
self._real_replica_uid = None
self._ensure_schema()
self._factory = document_factory or Document
@classmethod
- def _open_database(cls, sqlite_file, document_factory=None, password=None):
+ def _open_database(cls, sqlite_file, password, document_factory=None):
if not os.path.isfile(sqlite_file):
raise errors.DatabaseDoesNotExist()
tries = 2
@@ -86,8 +85,7 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):
# where without re-opening the database on Windows, it
# doesn't see the transaction that was just committed
db_handle = dbapi2.connect(sqlite_file)
- if password:
- SQLiteDatabase.set_pragma_key(db_handle, password)
+ SQLCipherDatabase.set_pragma_key(db_handle, password)
c = db_handle.cursor()
v, err = cls._which_index_storage(c)
db_handle.close()
@@ -100,23 +98,22 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):
tries -= 1
time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL)
return SQLCipherDatabase._sqlite_registry[v](
- sqlite_file, document_factory=document_factory)
+ sqlite_file, password, document_factory=document_factory)
@classmethod
- def open_database(cls, sqlite_file, create, backend_cls=None,
- document_factory=None, password=None):
+ def open_database(cls, sqlite_file, password, create, backend_cls=None,
+ document_factory=None):
try:
- return cls._open_database(sqlite_file,
- document_factory=document_factory,
- password=password)
+ return cls._open_database(sqlite_file, password,
+ document_factory=document_factory)
except errors.DatabaseDoesNotExist:
if not create:
raise
if backend_cls is None:
# default is SQLCipherPartialExpandDatabase
backend_cls = SQLCipherDatabase
- return backend_cls(sqlite_file, document_factory=document_factory,
- password=password)
+ return backend_cls(sqlite_file, password,
+ document_factory=document_factory)
@staticmethod
def register_implementation(klass):
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py
index e35a6d90..f9e9f681 100644
--- a/src/leap/soledad/tests/test_sqlcipher.py
+++ b/src/leap/soledad/tests/test_sqlcipher.py
@@ -36,7 +36,7 @@ simple_doc = '{"key": "value"}'
nested_doc = '{"key": "value", "sub": {"doc": "underneath"}}'
-class TestSQLiteDatabase(tests.TestCase):
+class TestSQLCipherDatabase(tests.TestCase):
def test_atomic_initialize(self):
tmpdir = self.createTempDir()
@@ -44,16 +44,17 @@ class TestSQLiteDatabase(tests.TestCase):
t2 = None # will be a thread
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
_index_storage_value = "testing"
def __init__(self, dbname, ntry):
self._try = ntry
self._is_initialized_invocations = 0
- super(SQLiteDatabaseTesting, self).__init__(dbname)
+ password = '123456'
+ super(SQLCipherDatabaseTesting, self).__init__(dbname, password)
def _is_initialized(self, c):
- res = super(SQLiteDatabaseTesting, self)._is_initialized(c)
+ res = super(SQLCipherDatabaseTesting, self)._is_initialized(c)
if self._try == 1:
self._is_initialized_invocations += 1
if self._is_initialized_invocations == 2:
@@ -66,26 +67,29 @@ class TestSQLiteDatabase(tests.TestCase):
def second_try():
try:
- db2 = SQLiteDatabaseTesting(dbname, 2)
+ db2 = SQLCipherDatabaseTesting(dbname, 2)
except Exception, e:
outcome2.append(e)
else:
outcome2.append(db2)
t2 = threading.Thread(target=second_try)
- db1 = SQLiteDatabaseTesting(dbname, 1)
+ db1 = SQLCipherDatabaseTesting(dbname, 1)
t2.join()
- self.assertIsInstance(outcome2[0], SQLiteDatabaseTesting)
+ self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting)
db2 = outcome2[0]
self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor()))
-class TestSQLitePartialExpandDatabase(tests.TestCase):
+_password = '123456'
+
+
+class TestSQLCipherPartialExpandDatabase(tests.TestCase):
def setUp(self):
- super(TestSQLitePartialExpandDatabase, self).setUp()
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ super(TestSQLCipherPartialExpandDatabase, self).setUp()
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.db._set_replica_uid('test')
def test_create_database(self):
@@ -93,7 +97,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
self.assertNotEqual(None, raw_db)
def test_default_replica_uid(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.assertIsNot(None, self.db._replica_uid)
self.assertEqual(32, len(self.db._replica_uid))
int(self.db._replica_uid, 16)
@@ -121,13 +125,13 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
c.execute("SELECT * FROM index_definitions")
def test__parse_index(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
g = self.db._parse_index_definition('fieldname')
self.assertIsInstance(g, query_parser.ExtractField)
self.assertEqual(['fieldname'], g.field)
def test__update_indexes(self):
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
g = self.db._parse_index_definition('fieldname')
c = self.db._get_sqlite_handle().cursor()
self.db._update_indexes('doc-id', {'fieldname': 'val'},
@@ -138,7 +142,7 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
def test__set_replica_uid(self):
# Start from scratch, so that replica_uid isn't set.
- self.db = sqlcipher.SQLCipherDatabase(':memory:')
+ self.db = sqlcipher.SQLCipherDatabase(':memory:', _password)
self.assertIsNot(None, self.db._real_replica_uid)
self.assertIsNot(None, self.db._replica_uid)
self.db._set_replica_uid('foo')
@@ -239,16 +243,16 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/rollback.db'
- class SQLitePartialExpandDbTesting(
+ class SQLCipherPartialExpandDbTesting(
sqlcipher.SQLCipherDatabase):
def _set_replica_uid_in_transaction(self, uid):
- super(SQLitePartialExpandDbTesting,
+ super(SQLCipherPartialExpandDbTesting,
self)._set_replica_uid_in_transaction(uid)
if fail:
raise Exception()
- db = SQLitePartialExpandDbTesting.__new__(SQLitePartialExpandDbTesting)
+ db = SQLCipherPartialExpandDbTesting.__new__(SQLCipherPartialExpandDbTesting)
db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed
fail = True
self.assertRaises(Exception, db._ensure_schema)
@@ -258,23 +262,23 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
def test__open_database(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlcipher.SQLCipherDatabase(path)
- db2 = sqlcipher.SQLCipherDatabase._open_database(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
+ db2 = sqlcipher.SQLCipherDatabase._open_database(path, _password)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test__open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlcipher.SQLCipherDatabase(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
db2 = sqlcipher.SQLCipherDatabase._open_database(
- path, document_factory=LeapDocument)
+ path, _password, document_factory=LeapDocument)
self.assertEqual(LeapDocument, db2._factory)
def test__open_database_non_existent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/non-existent.sqlite'
self.assertRaises(errors.DatabaseDoesNotExist,
- sqlcipher.SQLCipherDatabase._open_database, path)
+ sqlcipher.SQLCipherDatabase._open_database, path, _password)
def test__open_database_during_init(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
@@ -285,17 +289,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
self.addCleanup(db.close)
observed = []
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1
@classmethod
def _which_index_storage(cls, c):
- res = super(SQLiteDatabaseTesting, cls)._which_index_storage(c)
+ res = super(SQLCipherDatabaseTesting, cls)._which_index_storage(c)
db._ensure_schema() # init db
observed.append(res[0])
return res
- db2 = SQLiteDatabaseTesting._open_database(path)
+ db2 = SQLCipherDatabaseTesting._open_database(path, _password)
self.addCleanup(db2.close)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
self.assertEqual([None,
@@ -303,39 +307,40 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
observed)
def test__open_database_invalid(self):
- class SQLiteDatabaseTesting(sqlcipher.SQLCipherDatabase):
+ class SQLCipherDatabaseTesting(sqlcipher.SQLCipherDatabase):
WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.1
temp_dir = self.createTempDir(prefix='u1db-test-')
path1 = temp_dir + '/invalid1.db'
with open(path1, 'wb') as f:
f.write("")
self.assertRaises(dbapi2.OperationalError,
- SQLiteDatabaseTesting._open_database, path1)
+ SQLCipherDatabaseTesting._open_database, path1, _password)
with open(path1, 'wb') as f:
f.write("invalid")
self.assertRaises(dbapi2.DatabaseError,
- SQLiteDatabaseTesting._open_database, path1)
+ SQLCipherDatabaseTesting._open_database, path1, _password)
def test_open_database_existing(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlcipher.SQLCipherDatabase(path)
- db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False)
+ sqlcipher.SQLCipherDatabase(path, _password)
+ db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password,
+ create=False)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test_open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlcipher.SQLCipherDatabase(path)
+ sqlcipher.SQLCipherDatabase(path, _password)
db2 = sqlcipher.SQLCipherDatabase.open_database(
- path, create=False, document_factory=LeapDocument)
+ path, _password, create=False, document_factory=LeapDocument)
self.assertEqual(LeapDocument, db2._factory)
def test_open_database_create(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/new.sqlite'
- sqlcipher.SQLCipherDatabase.open_database(path, create=True)
- db2 = sqlcipher.SQLCipherDatabase.open_database(path, create=False)
+ sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True)
+ db2 = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=False)
self.assertIsInstance(db2, sqlcipher.SQLCipherDatabase)
def test_open_database_non_existent(self):
@@ -343,17 +348,17 @@ class TestSQLitePartialExpandDatabase(tests.TestCase):
path = temp_dir + '/non-existent.sqlite'
self.assertRaises(errors.DatabaseDoesNotExist,
sqlcipher.SQLCipherDatabase.open_database, path,
- create=False)
+ _password, create=False)
def test_delete_database_existent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/new.sqlite'
- db = sqlcipher.SQLCipherDatabase.open_database(path, create=True)
+ db = sqlcipher.SQLCipherDatabase.open_database(path, _password, create=True)
db.close()
sqlcipher.SQLCipherDatabase.delete_database(path)
self.assertRaises(errors.DatabaseDoesNotExist,
sqlcipher.SQLCipherDatabase.open_database, path,
- create=False)
+ _password, create=False)
def test_delete_database_nonexistent(self):
temp_dir = self.createTempDir(prefix='u1db-test-')