summaryrefslogtreecommitdiff
path: root/testing/tests/sqlcipher
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-07-10 10:38:04 +0200
committerKali Kaneko <kali@leap.se>2016-07-12 03:09:33 +0200
commitf406ccbaf2b79db1d65827463f830f4ffbe5856c (patch)
treefb061e6782615ad6cb327dd8bafdb4155134ab9c /testing/tests/sqlcipher
parentd99198046e07abb0d19fde1695d22267bc5e1433 (diff)
[refactor] make u1db connection pool args explicit
Diffstat (limited to 'testing/tests/sqlcipher')
-rw-r--r--testing/tests/sqlcipher/test_async.py146
-rw-r--r--testing/tests/sqlcipher/test_sqlcipher.py705
2 files changed, 851 insertions, 0 deletions
diff --git a/testing/tests/sqlcipher/test_async.py b/testing/tests/sqlcipher/test_async.py
new file mode 100644
index 00000000..42c315fe
--- /dev/null
+++ b/testing/tests/sqlcipher/test_async.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# test_async.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import hashlib
+
+from twisted.internet import defer
+
+from test_soledad.util import BaseSoledadTest
+from leap.soledad.client import adbapi
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+
+
+class ASyncSQLCipherRetryTestCase(BaseSoledadTest):
+
+ """
+ Test asynchronous SQLCipher operation.
+ """
+
+ NUM_DOCS = 5000
+
+ def setUp(self):
+ BaseSoledadTest.setUp(self)
+ self._dbpool = self._get_dbpool()
+
+ def tearDown(self):
+ self._dbpool.close()
+ BaseSoledadTest.tearDown(self)
+
+ def _get_dbpool(self):
+ tmpdb = os.path.join(self.tempdir, "test.soledad")
+ opts = SQLCipherOptions(tmpdb, "secret", create=True)
+ return adbapi.getConnectionPool(opts)
+
+ def _get_sample(self):
+ if not getattr(self, "_sample", None):
+ dirname = os.path.dirname(os.path.realpath(__file__))
+ sample_file = os.path.join(dirname, "hacker_crackdown.txt")
+ with open(sample_file) as f:
+ self._sample = f.readlines()
+ return self._sample
+
+ def test_concurrent_puts_fail_with_few_retries_and_small_timeout(self):
+ """
+ Test if concurrent updates to the database with small timeout and
+ small number of retries fail with "database is locked" error.
+
+ Many concurrent write attempts to the same sqlcipher database may fail
+ when the timeout is small and there are no retries. This test will
+ pass if any of the attempts to write the database fail.
+
+ This test is much dependent on the environment and its result intends
+ to contrast with the test for the workaround for the "database is
+ locked" problem, which is addressed by the "test_concurrent_puts" test
+ below.
+
+ If this test ever fails, it means that either (1) the platform where
+ you are running is it very powerful and you should try with an even
+ lower timeout value, or (2) the bug has been solved by a better
+ implementation of the underlying database pool, and thus this test
+ should be removed from the test suite.
+ """
+
+ old_timeout = adbapi.SQLCIPHER_CONNECTION_TIMEOUT
+ old_max_retries = adbapi.SQLCIPHER_MAX_RETRIES
+
+ adbapi.SQLCIPHER_CONNECTION_TIMEOUT = 1
+ adbapi.SQLCIPHER_MAX_RETRIES = 1
+
+ def _create_doc(doc):
+ return self._dbpool.runU1DBQuery("create_doc", doc)
+
+ def _insert_docs():
+ deferreds = []
+ for i in range(self.NUM_DOCS):
+ payload = self._get_sample()[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = _create_doc(doc)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ def _errback(e):
+ if e.value[0].getErrorMessage() == "database is locked":
+ adbapi.SQLCIPHER_CONNECTION_TIMEOUT = old_timeout
+ adbapi.SQLCIPHER_MAX_RETRIES = old_max_retries
+ return defer.succeed("")
+ raise Exception
+
+ d = _insert_docs()
+ d.addCallback(lambda _: self._dbpool.runU1DBQuery("get_all_docs"))
+ d.addErrback(_errback)
+ return d
+
+ def test_concurrent_puts(self):
+ """
+ Test that many concurrent puts succeed.
+
+ Currently, there's a known problem with the concurrent database pool
+ which is that many concurrent attempts to write to the database may
+ fail when the lock timeout is small and when there are no (or few)
+ retries. We currently workaround this problem by increasing the
+ timeout and the number of retries.
+
+ Should this test ever fail, it probably means that the timeout and/or
+ number of retries should be increased for the platform you're running
+ the test. If the underlying database pool is ever fixed, then the test
+ above will fail and we should remove this comment from here.
+ """
+
+ def _create_doc(doc):
+ return self._dbpool.runU1DBQuery("create_doc", doc)
+
+ def _insert_docs():
+ deferreds = []
+ for i in range(self.NUM_DOCS):
+ payload = self._get_sample()[i]
+ chash = hashlib.sha256(payload).hexdigest()
+ doc = {"number": i, "payload": payload, 'chash': chash}
+ d = _create_doc(doc)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ def _count_docs(results):
+ _, docs = results
+ if self.NUM_DOCS == len(docs):
+ return defer.succeed("")
+ raise Exception
+
+ d = _insert_docs()
+ d.addCallback(lambda _: self._dbpool.runU1DBQuery("get_all_docs"))
+ d.addCallback(_count_docs)
+ return d
diff --git a/testing/tests/sqlcipher/test_sqlcipher.py b/testing/tests/sqlcipher/test_sqlcipher.py
new file mode 100644
index 00000000..11472d46
--- /dev/null
+++ b/testing/tests/sqlcipher/test_sqlcipher.py
@@ -0,0 +1,705 @@
+# -*- coding: utf-8 -*-
+# test_sqlcipher.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test sqlcipher backend internals.
+"""
+import os
+import time
+import threading
+import tempfile
+import shutil
+
+from pysqlcipher import dbapi2
+from testscenarios import TestWithScenarios
+
+# l2db stuff.
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db import query_parser
+from leap.soledad.common.l2db.backends.sqlite_backend \
+ import SQLitePartialExpandDatabase
+
+# soledad stuff.
+from leap.soledad.common import soledad_assert
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.client.sqlcipher import SQLCipherDatabase
+from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import DatabaseIsNotEncrypted
+
+# u1db tests stuff.
+from test_soledad import u1db_tests as tests
+from test_soledad.u1db_tests import test_backends
+from test_soledad.u1db_tests import test_open
+from test_soledad.util import SQLCIPHER_SCENARIOS
+from test_soledad.util import PASSWORD
+from test_soledad.util import BaseSoledadTest
+
+
+def sqlcipher_open(path, passphrase, create=True, document_factory=None):
+ return SQLCipherDatabase(
+ SQLCipherOptions(path, passphrase, create=create))
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_common_backend`.
+# -----------------------------------------------------------------------------
+
+class TestSQLCipherBackendImpl(tests.TestCase):
+
+ def test__allocate_doc_id(self):
+ db = sqlcipher_open(':memory:', PASSWORD)
+ doc_id1 = db._allocate_doc_id()
+ self.assertTrue(doc_id1.startswith('D-'))
+ self.assertEqual(34, len(doc_id1))
+ int(doc_id1[len('D-'):], 16)
+ self.assertNotEqual(doc_id1, db._allocate_doc_id())
+ db.close()
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_backends`.
+# -----------------------------------------------------------------------------
+
+class SQLCipherTests(TestWithScenarios, test_backends.AllDatabaseTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+class SQLCipherDatabaseTests(TestWithScenarios,
+ test_backends.LocalDatabaseTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+class SQLCipherValidateGenNTransIdTests(
+ TestWithScenarios,
+ test_backends.LocalDatabaseValidateGenNTransIdTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+class SQLCipherValidateSourceGenTests(
+ TestWithScenarios,
+ test_backends.LocalDatabaseValidateSourceGenTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+class SQLCipherWithConflictsTests(
+ TestWithScenarios,
+ test_backends.LocalDatabaseWithConflictsTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+class SQLCipherIndexTests(
+ TestWithScenarios, test_backends.DatabaseIndexTests):
+ scenarios = SQLCIPHER_SCENARIOS
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_sqlite_backend`.
+# -----------------------------------------------------------------------------
+
+class TestSQLCipherDatabase(tests.TestCase):
+ """
+ Tests from u1db.tests.test_sqlite_backend.TestSQLiteDatabase.
+ """
+
+ def test_atomic_initialize(self):
+ # This test was modified to ensure that db2.close() is called within
+ # the thread that created the database.
+ tmpdir = self.createTempDir()
+ dbname = os.path.join(tmpdir, 'atomic.db')
+
+ t2 = None # will be a thread
+
+ class SQLCipherDatabaseTesting(SQLCipherDatabase):
+ _index_storage_value = "testing"
+
+ def __init__(self, dbname, ntry):
+ self._try = ntry
+ self._is_initialized_invocations = 0
+ SQLCipherDatabase.__init__(
+ self,
+ SQLCipherOptions(dbname, PASSWORD))
+
+ def _is_initialized(self, c):
+ res = \
+ SQLCipherDatabase._is_initialized(self, c)
+ if self._try == 1:
+ self._is_initialized_invocations += 1
+ if self._is_initialized_invocations == 2:
+ t2.start()
+ # hard to do better and have a generic test
+ time.sleep(0.05)
+ return res
+
+ class SecondTry(threading.Thread):
+
+ outcome2 = []
+
+ def run(self):
+ try:
+ db2 = SQLCipherDatabaseTesting(dbname, 2)
+ except Exception, e:
+ SecondTry.outcome2.append(e)
+ else:
+ SecondTry.outcome2.append(db2)
+
+ t2 = SecondTry()
+ db1 = SQLCipherDatabaseTesting(dbname, 1)
+ t2.join()
+
+ self.assertIsInstance(SecondTry.outcome2[0], SQLCipherDatabaseTesting)
+ self.assertTrue(db1._is_initialized(db1._get_sqlite_handle().cursor()))
+ db1.close()
+
+
+class TestSQLCipherPartialExpandDatabase(tests.TestCase):
+ """
+ Tests from u1db.tests.test_sqlite_backend.TestSQLitePartialExpandDatabase.
+ """
+
+ # The following tests had to be cloned from u1db because they all
+ # instantiate the backend directly, so we need to change that in order to
+ # our backend be instantiated in place.
+
+ def setUp(self):
+ self.db = sqlcipher_open(':memory:', PASSWORD)
+
+ def tearDown(self):
+ self.db.close()
+
+ def test_default_replica_uid(self):
+ self.assertIsNot(None, self.db._replica_uid)
+ self.assertEqual(32, len(self.db._replica_uid))
+ int(self.db._replica_uid, 16)
+
+ def test__parse_index(self):
+ g = self.db._parse_index_definition('fieldname')
+ self.assertIsInstance(g, query_parser.ExtractField)
+ self.assertEqual(['fieldname'], g.field)
+
+ def test__update_indexes(self):
+ g = self.db._parse_index_definition('fieldname')
+ c = self.db._get_sqlite_handle().cursor()
+ self.db._update_indexes('doc-id', {'fieldname': 'val'},
+ [('fieldname', g)], c)
+ c.execute('SELECT doc_id, field_name, value FROM document_fields')
+ self.assertEqual([('doc-id', 'fieldname', 'val')],
+ c.fetchall())
+
+ def test_create_database(self):
+ raw_db = self.db._get_sqlite_handle()
+ self.assertNotEqual(None, raw_db)
+
+ def test__set_replica_uid(self):
+ # Start from scratch, so that replica_uid isn't set.
+ self.assertIsNot(None, self.db._real_replica_uid)
+ self.assertIsNot(None, self.db._replica_uid)
+ self.db._set_replica_uid('foo')
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT value FROM u1db_config WHERE name='replica_uid'")
+ self.assertEqual(('foo',), c.fetchone())
+ self.assertEqual('foo', self.db._real_replica_uid)
+ self.assertEqual('foo', self.db._replica_uid)
+ self.db._close_sqlite_handle()
+ self.assertEqual('foo', self.db._replica_uid)
+
+ def test__open_database(self):
+ # SQLCipherDatabase has no _open_database() method, so we just pass
+ # (and test for the same funcionality on test_open_database_existing()
+ # below).
+ pass
+
+ def test__open_database_with_factory(self):
+ # SQLCipherDatabase has no _open_database() method.
+ pass
+
+ def test__open_database_non_existent(self):
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/non-existent.sqlite'
+ self.assertRaises(errors.DatabaseDoesNotExist,
+ sqlcipher_open,
+ path, PASSWORD, create=False)
+
+ def test__open_database_during_init(self):
+ # The purpose of this test is to ensure that _open_database() parallel
+ # db initialization behaviour is correct. As SQLCipherDatabase does
+ # not have an _open_database() method, we just do not implement this
+ # test.
+ pass
+
+ def test__open_database_invalid(self):
+ # This test was modified to ensure that an empty database file will
+ # raise a DatabaseIsNotEncrypted exception instead of a
+ # dbapi2.OperationalError exception.
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path1 = temp_dir + '/invalid1.db'
+ with open(path1, 'wb') as f:
+ f.write("")
+ self.assertRaises(DatabaseIsNotEncrypted,
+ sqlcipher_open, path1,
+ PASSWORD)
+ with open(path1, 'wb') as f:
+ f.write("invalid")
+ self.assertRaises(dbapi2.DatabaseError,
+ sqlcipher_open, path1,
+ PASSWORD)
+
+ def test_open_database_existing(self):
+ # In the context of SQLCipherDatabase, where no _open_database()
+ # method exists and thus there's no call to _which_index_storage(),
+ # this test tests for the same functionality as
+ # test_open_database_create() below. So, we just pass.
+ pass
+
+ def test_open_database_with_factory(self):
+ # SQLCipherDatabase's constructor has no factory parameter.
+ pass
+
+ def test_open_database_create(self):
+ # SQLCipherDatabas has no open_database() method, so we just test for
+ # the actual database constructor effects.
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/new.sqlite'
+ db1 = sqlcipher_open(path, PASSWORD, create=True)
+ db2 = sqlcipher_open(path, PASSWORD, create=False)
+ self.assertIsInstance(db2, SQLCipherDatabase)
+ db1.close()
+ db2.close()
+
+ def test_create_database_initializes_schema(self):
+ # This test had to be cloned because our implementation of SQLCipher
+ # backend is referenced with an index_storage_value that includes the
+ # word "encrypted". See u1db's sqlite_backend and our
+ # sqlcipher_backend for reference.
+ raw_db = self.db._get_sqlite_handle()
+ c = raw_db.cursor()
+ c.execute("SELECT * FROM u1db_config")
+ config = dict([(r[0], r[1]) for r in c.fetchall()])
+ replica_uid = self.db._replica_uid
+ self.assertEqual({'sql_schema': '0', 'replica_uid': replica_uid,
+ 'index_storage': 'expand referenced encrypted'},
+ config)
+
+ def test_store_syncable(self):
+ doc = self.db.create_doc_from_json(tests.simple_doc)
+ # assert that docs are syncable by default
+ self.assertEqual(True, doc.syncable)
+ # assert that we can store syncable = False
+ doc.syncable = False
+ self.db.put_doc(doc)
+ self.assertEqual(False, self.db.get_doc(doc.doc_id).syncable)
+ # assert that we can store syncable = True
+ doc.syncable = True
+ self.db.put_doc(doc)
+ self.assertEqual(True, self.db.get_doc(doc.doc_id).syncable)
+
+ def test__close_sqlite_handle(self):
+ raw_db = self.db._get_sqlite_handle()
+ self.db._close_sqlite_handle()
+ self.assertRaises(dbapi2.ProgrammingError,
+ raw_db.cursor)
+
+ def test__get_generation(self):
+ self.assertEqual(0, self.db._get_generation())
+
+ def test__get_generation_info(self):
+ self.assertEqual((0, ''), self.db._get_generation_info())
+
+ def test_create_index(self):
+ self.db.create_index('test-idx', "key")
+ self.assertEqual([('test-idx', ["key"])], self.db.list_indexes())
+
+ def test_create_index_multiple_fields(self):
+ self.db.create_index('test-idx', "key", "key2")
+ self.assertEqual([('test-idx', ["key", "key2"])],
+ self.db.list_indexes())
+
+ def test__get_index_definition(self):
+ self.db.create_index('test-idx', "key", "key2")
+ # TODO: How would you test that an index is getting used for an SQL
+ # request?
+ self.assertEqual(["key", "key2"],
+ self.db._get_index_definition('test-idx'))
+
+ def test_list_index_mixed(self):
+ # Make sure that we properly order the output
+ c = self.db._get_sqlite_handle().cursor()
+ # We intentionally insert the data in weird ordering, to make sure the
+ # query still gets it back correctly.
+ c.executemany("INSERT INTO index_definitions VALUES (?, ?, ?)",
+ [('idx-1', 0, 'key10'),
+ ('idx-2', 2, 'key22'),
+ ('idx-1', 1, 'key11'),
+ ('idx-2', 0, 'key20'),
+ ('idx-2', 1, 'key21')])
+ self.assertEqual([('idx-1', ['key10', 'key11']),
+ ('idx-2', ['key20', 'key21', 'key22'])],
+ self.db.list_indexes())
+
+ def test_no_indexes_no_document_fields(self):
+ self.db.create_doc_from_json(
+ '{"key1": "val1", "key2": "val2"}')
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([], c.fetchall())
+
+ def test_create_extracts_fields(self):
+ doc1 = self.db.create_doc_from_json('{"key1": "val1", "key2": "val2"}')
+ doc2 = self.db.create_doc_from_json('{"key1": "valx", "key2": "valy"}')
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([], c.fetchall())
+ self.db.create_index('test', 'key1', 'key2')
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual(sorted(
+ [(doc1.doc_id, "key1", "val1"),
+ (doc1.doc_id, "key2", "val2"),
+ (doc2.doc_id, "key1", "valx"),
+ (doc2.doc_id, "key2", "valy"), ]), sorted(c.fetchall()))
+
+ def test_put_updates_fields(self):
+ self.db.create_index('test', 'key1', 'key2')
+ doc1 = self.db.create_doc_from_json(
+ '{"key1": "val1", "key2": "val2"}')
+ doc1.content = {"key1": "val1", "key2": "valy"}
+ self.db.put_doc(doc1)
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([(doc1.doc_id, "key1", "val1"),
+ (doc1.doc_id, "key2", "valy"), ], c.fetchall())
+
+ def test_put_updates_nested_fields(self):
+ self.db.create_index('test', 'key', 'sub.doc')
+ doc1 = self.db.create_doc_from_json(tests.nested_doc)
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([(doc1.doc_id, "key", "value"),
+ (doc1.doc_id, "sub.doc", "underneath"), ],
+ c.fetchall())
+
+ def test__ensure_schema_rollback(self):
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/rollback.db'
+
+ class SQLitePartialExpandDbTesting(SQLCipherDatabase):
+
+ def _set_replica_uid_in_transaction(self, uid):
+ super(SQLitePartialExpandDbTesting,
+ self)._set_replica_uid_in_transaction(uid)
+ if fail:
+ raise Exception()
+
+ db = SQLitePartialExpandDbTesting.__new__(SQLitePartialExpandDbTesting)
+ db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed
+ fail = True
+ self.assertRaises(Exception, db._ensure_schema)
+ fail = False
+ db._initialize(db._db_handle.cursor())
+
+ def test_open_database_non_existent(self):
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/non-existent.sqlite'
+ self.assertRaises(errors.DatabaseDoesNotExist,
+ sqlcipher_open, path, "123",
+ create=False)
+
+ def test_delete_database_existent(self):
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/new.sqlite'
+ db = sqlcipher_open(path, "123", create=True)
+ db.close()
+ SQLCipherDatabase.delete_database(path)
+ self.assertRaises(errors.DatabaseDoesNotExist,
+ sqlcipher_open, path, "123",
+ create=False)
+
+ def test_delete_database_nonexistent(self):
+ temp_dir = self.createTempDir(prefix='u1db-test-')
+ path = temp_dir + '/non-existent.sqlite'
+ self.assertRaises(errors.DatabaseDoesNotExist,
+ SQLCipherDatabase.delete_database, path)
+
+ def test__get_indexed_fields(self):
+ self.db.create_index('idx1', 'a', 'b')
+ self.assertEqual(set(['a', 'b']), self.db._get_indexed_fields())
+ self.db.create_index('idx2', 'b', 'c')
+ self.assertEqual(set(['a', 'b', 'c']), self.db._get_indexed_fields())
+
+ def test_indexed_fields_expanded(self):
+ self.db.create_index('idx1', 'key1')
+ doc1 = self.db.create_doc_from_json('{"key1": "val1", "key2": "val2"}')
+ self.assertEqual(set(['key1']), self.db._get_indexed_fields())
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([(doc1.doc_id, 'key1', 'val1')], c.fetchall())
+
+ def test_create_index_updates_fields(self):
+ doc1 = self.db.create_doc_from_json('{"key1": "val1", "key2": "val2"}')
+ self.db.create_index('idx1', 'key1')
+ self.assertEqual(set(['key1']), self.db._get_indexed_fields())
+ c = self.db._get_sqlite_handle().cursor()
+ c.execute("SELECT doc_id, field_name, value FROM document_fields"
+ " ORDER BY doc_id, field_name, value")
+ self.assertEqual([(doc1.doc_id, 'key1', 'val1')], c.fetchall())
+
+ def assertFormatQueryEquals(self, exp_statement, exp_args, definition,
+ values):
+ statement, args = self.db._format_query(definition, values)
+ self.assertEqual(exp_statement, statement)
+ self.assertEqual(exp_args, args)
+
+ def test__format_query(self):
+ self.assertFormatQueryEquals(
+ "SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM "
+ "document d, document_fields d0 LEFT OUTER JOIN conflicts c ON "
+ "c.doc_id = d.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name "
+ "= ? AND d0.value = ? GROUP BY d.doc_id, d.doc_rev, d.content "
+ "ORDER BY d0.value;", ["key1", "a"],
+ ["key1"], ["a"])
+
+ def test__format_query2(self):
+ self.assertFormatQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value = ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value = ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value = ? GROUP BY d.doc_id, d.doc_rev, d.content ORDER BY '
+ 'd0.value, d1.value, d2.value;',
+ ["key1", "a", "key2", "b", "key3", "c"],
+ ["key1", "key2", "key3"], ["a", "b", "c"])
+
+ def test__format_query_wildcard(self):
+ self.assertFormatQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value = ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value GLOB ? AND d.doc_id = d2.doc_id AND d2.field_name = ? '
+ 'AND d2.value NOT NULL GROUP BY d.doc_id, d.doc_rev, d.content '
+ 'ORDER BY d0.value, d1.value, d2.value;',
+ ["key1", "a", "key2", "b*", "key3"], ["key1", "key2", "key3"],
+ ["a", "b*", "*"])
+
+ def assertFormatRangeQueryEquals(self, exp_statement, exp_args, definition,
+ start_value, end_value):
+ statement, args = self.db._format_range_query(
+ definition, start_value, end_value)
+ self.assertEqual(exp_statement, statement)
+ self.assertEqual(exp_args, args)
+
+ def test__format_range_query(self):
+ self.assertFormatRangeQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value >= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value >= ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value >= ? AND d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value <= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value <= ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value <= ? GROUP BY d.doc_id, d.doc_rev, d.content ORDER BY '
+ 'd0.value, d1.value, d2.value;',
+ ['key1', 'a', 'key2', 'b', 'key3', 'c', 'key1', 'p', 'key2', 'q',
+ 'key3', 'r'],
+ ["key1", "key2", "key3"], ["a", "b", "c"], ["p", "q", "r"])
+
+ def test__format_range_query_no_start(self):
+ self.assertFormatRangeQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value <= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value <= ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value <= ? GROUP BY d.doc_id, d.doc_rev, d.content ORDER BY '
+ 'd0.value, d1.value, d2.value;',
+ ['key1', 'a', 'key2', 'b', 'key3', 'c'],
+ ["key1", "key2", "key3"], None, ["a", "b", "c"])
+
+ def test__format_range_query_no_end(self):
+ self.assertFormatRangeQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value >= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value >= ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value >= ? GROUP BY d.doc_id, d.doc_rev, d.content ORDER BY '
+ 'd0.value, d1.value, d2.value;',
+ ['key1', 'a', 'key2', 'b', 'key3', 'c'],
+ ["key1", "key2", "key3"], ["a", "b", "c"], None)
+
+ def test__format_range_query_wildcard(self):
+ self.assertFormatRangeQueryEquals(
+ 'SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM '
+ 'document d, document_fields d0, document_fields d1, '
+ 'document_fields d2 LEFT OUTER JOIN conflicts c ON c.doc_id = '
+ 'd.doc_id WHERE d.doc_id = d0.doc_id AND d0.field_name = ? AND '
+ 'd0.value >= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? AND '
+ 'd1.value >= ? AND d.doc_id = d2.doc_id AND d2.field_name = ? AND '
+ 'd2.value NOT NULL AND d.doc_id = d0.doc_id AND d0.field_name = ? '
+ 'AND d0.value <= ? AND d.doc_id = d1.doc_id AND d1.field_name = ? '
+ 'AND (d1.value < ? OR d1.value GLOB ?) AND d.doc_id = d2.doc_id '
+ 'AND d2.field_name = ? AND d2.value NOT NULL GROUP BY d.doc_id, '
+ 'd.doc_rev, d.content ORDER BY d0.value, d1.value, d2.value;',
+ ['key1', 'a', 'key2', 'b', 'key3', 'key1', 'p', 'key2', 'q', 'q*',
+ 'key3'],
+ ["key1", "key2", "key3"], ["a", "b*", "*"], ["p", "q*", "*"])
+
+
+# -----------------------------------------------------------------------------
+# The following tests come from `u1db.tests.test_open`.
+# -----------------------------------------------------------------------------
+
+
+class SQLCipherOpen(test_open.TestU1DBOpen):
+
+ def test_open_no_create(self):
+ self.assertRaises(errors.DatabaseDoesNotExist,
+ sqlcipher_open, self.db_path,
+ PASSWORD,
+ create=False)
+ self.assertFalse(os.path.exists(self.db_path))
+
+ def test_open_create(self):
+ db = sqlcipher_open(self.db_path, PASSWORD, create=True)
+ self.addCleanup(db.close)
+ self.assertTrue(os.path.exists(self.db_path))
+ self.assertIsInstance(db, SQLCipherDatabase)
+
+ def test_open_with_factory(self):
+ db = sqlcipher_open(self.db_path, PASSWORD, create=True,
+ document_factory=SoledadDocument)
+ self.addCleanup(db.close)
+ doc = db.create_doc({})
+ self.assertTrue(isinstance(doc, SoledadDocument))
+
+ def test_open_existing(self):
+ db = sqlcipher_open(self.db_path, PASSWORD)
+ self.addCleanup(db.close)
+ doc = db.create_doc_from_json(tests.simple_doc)
+ # Even though create=True, we shouldn't wipe the db
+ db2 = sqlcipher_open(self.db_path, PASSWORD, create=True)
+ self.addCleanup(db2.close)
+ doc2 = db2.get_doc(doc.doc_id)
+ self.assertEqual(doc, doc2)
+
+ def test_open_existing_no_create(self):
+ db = sqlcipher_open(self.db_path, PASSWORD)
+ self.addCleanup(db.close)
+ db2 = sqlcipher_open(self.db_path, PASSWORD, create=False)
+ self.addCleanup(db2.close)
+ self.assertIsInstance(db2, SQLCipherDatabase)
+
+
+# -----------------------------------------------------------------------------
+# Tests for actual encryption of the database
+# -----------------------------------------------------------------------------
+
+class SQLCipherEncryptionTests(BaseSoledadTest):
+
+ """
+ Tests to guarantee SQLCipher is indeed encrypting data when storing.
+ """
+
+ def _delete_dbfiles(self):
+ for dbfile in [self.DB_FILE]:
+ if os.path.exists(dbfile):
+ os.unlink(dbfile)
+
+ def setUp(self):
+ # the following come from BaseLeapTest.setUpClass, because
+ # twisted.trial doesn't support such class methods for setting up
+ # test classes.
+ self.old_path = os.environ['PATH']
+ self.old_home = os.environ['HOME']
+ self.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ self.home = self.tempdir
+ bin_tdir = os.path.join(
+ self.tempdir,
+ 'bin')
+ os.environ["PATH"] = bin_tdir
+ os.environ["HOME"] = self.tempdir
+ # this is our own stuff
+ self.DB_FILE = os.path.join(self.tempdir, 'test.db')
+ self._delete_dbfiles()
+
+ def tearDown(self):
+ self._delete_dbfiles()
+ # the following come from BaseLeapTest.tearDownClass, because
+ # twisted.trial doesn't support such class methods for tearing down
+ # test classes.
+ os.environ["PATH"] = self.old_path
+ os.environ["HOME"] = self.old_home
+ # safety check! please do not wipe my home...
+ # XXX needs to adapt to non-linuces
+ soledad_assert(
+ self.tempdir.startswith('/tmp/leap_tests-') or
+ self.tempdir.startswith('/var/folder'),
+ "beware! tried to remove a dir which does not "
+ "live in temporal folder!")
+ shutil.rmtree(self.tempdir)
+
+ def test_try_to_open_encrypted_db_with_sqlite_backend(self):
+ """
+ SQLite backend should not succeed to open SQLCipher databases.
+ """
+ db = sqlcipher_open(self.DB_FILE, PASSWORD)
+ doc = db.create_doc_from_json(tests.simple_doc)
+ db.close()
+ try:
+ # trying to open an encrypted database with the regular u1db
+ # backend should raise a DatabaseError exception.
+ SQLitePartialExpandDatabase(self.DB_FILE,
+ document_factory=SoledadDocument)
+ raise DatabaseIsNotEncrypted()
+ except dbapi2.DatabaseError:
+ # at this point we know that the regular U1DB sqlcipher backend
+ # did not succeed on opening the database, so it was indeed
+ # encrypted.
+ db = sqlcipher_open(self.DB_FILE, PASSWORD)
+ doc = db.get_doc(doc.doc_id)
+ self.assertEqual(tests.simple_doc, doc.get_json(),
+ 'decrypted content mismatch')
+ db.close()
+
+ def test_try_to_open_raw_db_with_sqlcipher_backend(self):
+ """
+ SQLCipher backend should not succeed to open unencrypted databases.
+ """
+ db = SQLitePartialExpandDatabase(self.DB_FILE,
+ document_factory=SoledadDocument)
+ db.create_doc_from_json(tests.simple_doc)
+ db.close()
+ try:
+ # trying to open the a non-encrypted database with sqlcipher
+ # backend should raise a DatabaseIsNotEncrypted exception.
+ db = sqlcipher_open(self.DB_FILE, PASSWORD)
+ db.close()
+ raise dbapi2.DatabaseError(
+ "SQLCipher backend should not be able to open non-encrypted "
+ "dbs.")
+ except DatabaseIsNotEncrypted:
+ pass