summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2013-01-17 15:37:32 -0200
committerdrebs <drebs@leap.se>2013-01-17 15:37:32 -0200
commit25065cedf2b36983a7d16480500cdf0cb80deba6 (patch)
tree90120eed9be523d1d4401b98c13c13ce6448d4e8
parent42877891fc5468c674e95384153e68659c51976e (diff)
Include missing u1db sqlite test for soledad sqlcipher backend.
-rw-r--r--tests/test_sqlcipher.py101
1 files changed, 73 insertions, 28 deletions
diff --git a/tests/test_sqlcipher.py b/tests/test_sqlcipher.py
index c896af91..870f9e5d 100644
--- a/tests/test_sqlcipher.py
+++ b/tests/test_sqlcipher.py
@@ -2,21 +2,21 @@
import os
import time
-import threading
-import unittest2 as unittest
-
from sqlite3 import dbapi2
+import unittest2 as unittest
+from StringIO import StringIO
+import threading
# u1db stuff.
from u1db import (
errors,
query_parser,
)
+from u1db.backends.sqlite_backend import SQLiteDatabase
# soledad stuff.
-from leap.soledad.backends import sqlcipher as sqlite_backend
+from leap.soledad.backends.sqlcipher import SQLCipherDatabase
from leap.soledad.backends.sqlcipher import open as u1db_open
-from leap.soledad.backends.leap_backend import LeapDocument
# u1db tests stuff.
from leap.soledad.tests import u1db_tests as tests
@@ -46,7 +46,7 @@ PASSWORD = '123456'
class TestSQLCipherBackendImpl(tests.TestCase):
def test__allocate_doc_id(self):
- db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ db = SQLCipherDatabase(':memory:', PASSWORD)
doc_id1 = db._allocate_doc_id()
self.assertTrue(doc_id1.startswith('D-'))
self.assertEqual(34, len(doc_id1))
@@ -59,7 +59,7 @@ class TestSQLCipherBackendImpl(tests.TestCase):
#-----------------------------------------------------------------------------
def make_sqlcipher_database_for_test(test, replica_uid):
- db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ db = SQLCipherDatabase(':memory:', PASSWORD)
db._set_replica_uid(replica_uid)
return db
@@ -70,7 +70,7 @@ def copy_sqlcipher_database_for_test(test, db):
# CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN
# CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR
# HOUSE.
- new_db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ new_db = SQLCipherDatabase(':memory:', PASSWORD)
tmpfile = StringIO()
for line in db._db_handle.iterdump():
if not 'sqlite_sequence' in line: # work around bug in iterdump
@@ -122,7 +122,52 @@ load_tests = tests.load_with_scenarios
# The following tests come from `u1db.tests.test_sqlite_backend`.
#-----------------------------------------------------------------------------
-class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):
+class TestSQLCipherDatabase(TestSQLiteDatabase):
+
+ def test_atomic_initialize(self):
+ tmpdir = self.createTempDir()
+ dbname = os.path.join(tmpdir, 'atomic.db')
+
+ t2 = None # will be a thread
+
+ class SQLCipherDatabaseTesting(SQLiteDatabase):
+ _index_storage_value = "testing"
+
+ def __init__(self, dbname, ntry):
+ self._try = ntry
+ self._is_initialized_invocations = 0
+ super(SQLCipherDatabaseTesting, self).__init__(dbname)
+
+ def _is_initialized(self, c):
+ res = super(SQLCipherDatabaseTesting, self)._is_initialized(c)
+ if self._try == 1:
+ self._is_initialized_invocations += 1
+ if self._is_initialized_invocations == 2:
+ t2.start()
+ # hard to do better and have a generic test
+ time.sleep(0.05)
+ return res
+
+ outcome2 = []
+
+ def second_try():
+ try:
+ db2 = SQLCipherDatabaseTesting(dbname, 2)
+ except Exception, e:
+ outcome2.append(e)
+ else:
+ outcome2.append(db2)
+
+ t2 = threading.Thread(target=second_try)
+ db1 = SQLCipherDatabaseTesting(dbname, 1)
+ t2.join()
+
+ self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting)
+ db2 = outcome2[0]
+ self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor()))
+
+
+class TestSQLCipherPartialExpandDatabase(TestSQLitePartialExpandDatabase):
# The following tests had to be cloned from u1db because they all
# instantiate the backend directly, so we need to change that in order to
@@ -130,23 +175,23 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):
def setUp(self):
super(TestSQLitePartialExpandDatabase, self).setUp()
- self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ self.db = SQLCipherDatabase(':memory:', PASSWORD)
self.db._set_replica_uid('test')
def test_default_replica_uid(self):
- self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ self.db = SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._replica_uid)
self.assertEqual(32, len(self.db._replica_uid))
int(self.db._replica_uid, 16)
def test__parse_index(self):
- self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ self.db = SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
self.assertIsInstance(g, query_parser.ExtractField)
self.assertEqual(['fieldname'], g.field)
def test__update_indexes(self):
- self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ self.db = SQLCipherDatabase(':memory:', PASSWORD)
g = self.db._parse_index_definition('fieldname')
c = self.db._get_sqlite_handle().cursor()
self.db._update_indexes('doc-id', {'fieldname': 'val'},
@@ -157,7 +202,7 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):
def test__set_replica_uid(self):
# Start from scratch, so that replica_uid isn't set.
- self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD)
+ self.db = SQLCipherDatabase(':memory:', PASSWORD)
self.assertIsNot(None, self.db._real_replica_uid)
self.assertIsNot(None, self.db._replica_uid)
self.db._set_replica_uid('foo')
@@ -172,30 +217,30 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):
def test__open_database(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlite_backend.SQLCipherDatabase(path, PASSWORD)
- db2 = sqlite_backend.SQLCipherDatabase._open_database(path, PASSWORD)
- self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase)
+ SQLCipherDatabase(path, PASSWORD)
+ db2 = SQLCipherDatabase._open_database(path, PASSWORD)
+ self.assertIsInstance(db2, SQLCipherDatabase)
def test__open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/test.sqlite'
- sqlite_backend.SQLCipherDatabase(path, PASSWORD)
- db2 = sqlite_backend.SQLCipherDatabase._open_database(
+ SQLCipherDatabase(path, PASSWORD)
+ db2 = SQLCipherDatabase._open_database(
path, PASSWORD, document_factory=TestAlternativeDocument)
self.assertEqual(TestAlternativeDocument, db2._factory)
def test_open_database_existing(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlite_backend.SQLCipherDatabase(path, PASSWORD)
- db2 = sqlite_backend.SQLCipherDatabase.open_database(path, PASSWORD, create=False)
- self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase)
+ SQLCipherDatabase(path, PASSWORD)
+ db2 = SQLCipherDatabase.open_database(path, PASSWORD, create=False)
+ self.assertIsInstance(db2, SQLCipherDatabase)
def test_open_database_with_factory(self):
temp_dir = self.createTempDir(prefix='u1db-test-')
path = temp_dir + '/existing.sqlite'
- sqlite_backend.SQLCipherDatabase(path, PASSWORD)
- db2 = sqlite_backend.SQLCipherDatabase.open_database(
+ SQLCipherDatabase(path, PASSWORD)
+ db2 = SQLCipherDatabase.open_database(
path, PASSWORD, create=False, document_factory=TestAlternativeDocument)
self.assertEqual(TestAlternativeDocument, db2._factory)
@@ -229,7 +274,7 @@ class SQLCipherOpen(TestU1DBOpen):
db = u1db_open(self.db_path, password=PASSWORD, create=True)
self.addCleanup(db.close)
self.assertTrue(os.path.exists(self.db_path))
- self.assertIsInstance(db, sqlite_backend.SQLCipherDatabase)
+ self.assertIsInstance(db, SQLCipherDatabase)
def test_open_with_factory(self):
db = u1db_open(self.db_path, password=PASSWORD, create=True,
@@ -238,7 +283,7 @@ class SQLCipherOpen(TestU1DBOpen):
self.assertEqual(TestAlternativeDocument, db._factory)
def test_open_existing(self):
- db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD)
+ db = SQLCipherDatabase(self.db_path, PASSWORD)
self.addCleanup(db.close)
doc = db.create_doc_from_json(tests.simple_doc)
# Even though create=True, we shouldn't wipe the db
@@ -248,8 +293,8 @@ class SQLCipherOpen(TestU1DBOpen):
self.assertEqual(doc, doc2)
def test_open_existing_no_create(self):
- db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD)
+ db = SQLCipherDatabase(self.db_path, PASSWORD)
self.addCleanup(db.close)
db2 = u1db_open(self.db_path, password=PASSWORD, create=False)
self.addCleanup(db2.close)
- self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase)
+ self.assertIsInstance(db2, SQLCipherDatabase)