From 14e749dbc9012cb66a94f96bd42f3cfa5ca836bd Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 17 Jan 2013 15:37:32 -0200 Subject: Include missing u1db sqlite test for soledad sqlcipher backend. --- src/leap/soledad/tests/test_sqlcipher.py | 101 ++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 28 deletions(-) (limited to 'src/leap/soledad/tests') diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py index c896af91..870f9e5d 100644 --- a/src/leap/soledad/tests/test_sqlcipher.py +++ b/src/leap/soledad/tests/test_sqlcipher.py @@ -2,21 +2,21 @@ import os import time -import threading -import unittest2 as unittest - from sqlite3 import dbapi2 +import unittest2 as unittest +from StringIO import StringIO +import threading # u1db stuff. from u1db import ( errors, query_parser, ) +from u1db.backends.sqlite_backend import SQLiteDatabase # soledad stuff. -from leap.soledad.backends import sqlcipher as sqlite_backend +from leap.soledad.backends.sqlcipher import SQLCipherDatabase from leap.soledad.backends.sqlcipher import open as u1db_open -from leap.soledad.backends.leap_backend import LeapDocument # u1db tests stuff. from leap.soledad.tests import u1db_tests as tests @@ -46,7 +46,7 @@ PASSWORD = '123456' class TestSQLCipherBackendImpl(tests.TestCase): def test__allocate_doc_id(self): - db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + db = SQLCipherDatabase(':memory:', PASSWORD) doc_id1 = db._allocate_doc_id() self.assertTrue(doc_id1.startswith('D-')) self.assertEqual(34, len(doc_id1)) @@ -59,7 +59,7 @@ class TestSQLCipherBackendImpl(tests.TestCase): #----------------------------------------------------------------------------- def make_sqlcipher_database_for_test(test, replica_uid): - db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + db = SQLCipherDatabase(':memory:', PASSWORD) db._set_replica_uid(replica_uid) return db @@ -70,7 +70,7 @@ def copy_sqlcipher_database_for_test(test, db): # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR # HOUSE. - new_db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + new_db = SQLCipherDatabase(':memory:', PASSWORD) tmpfile = StringIO() for line in db._db_handle.iterdump(): if not 'sqlite_sequence' in line: # work around bug in iterdump @@ -122,7 +122,52 @@ load_tests = tests.load_with_scenarios # The following tests come from `u1db.tests.test_sqlite_backend`. #----------------------------------------------------------------------------- -class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): +class TestSQLCipherDatabase(TestSQLiteDatabase): + + def test_atomic_initialize(self): + tmpdir = self.createTempDir() + dbname = os.path.join(tmpdir, 'atomic.db') + + t2 = None # will be a thread + + class SQLCipherDatabaseTesting(SQLiteDatabase): + _index_storage_value = "testing" + + def __init__(self, dbname, ntry): + self._try = ntry + self._is_initialized_invocations = 0 + super(SQLCipherDatabaseTesting, self).__init__(dbname) + + def _is_initialized(self, c): + res = super(SQLCipherDatabaseTesting, self)._is_initialized(c) + if self._try == 1: + self._is_initialized_invocations += 1 + if self._is_initialized_invocations == 2: + t2.start() + # hard to do better and have a generic test + time.sleep(0.05) + return res + + outcome2 = [] + + def second_try(): + try: + db2 = SQLCipherDatabaseTesting(dbname, 2) + except Exception, e: + outcome2.append(e) + else: + outcome2.append(db2) + + t2 = threading.Thread(target=second_try) + db1 = SQLCipherDatabaseTesting(dbname, 1) + t2.join() + + self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting) + db2 = outcome2[0] + self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor())) + + +class TestSQLCipherPartialExpandDatabase(TestSQLitePartialExpandDatabase): # The following tests had to be cloned from u1db because they all # instantiate the backend directly, so we need to change that in order to @@ -130,23 +175,23 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): def setUp(self): super(TestSQLitePartialExpandDatabase, self).setUp() - self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + self.db = SQLCipherDatabase(':memory:', PASSWORD) self.db._set_replica_uid('test') def test_default_replica_uid(self): - self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + self.db = SQLCipherDatabase(':memory:', PASSWORD) self.assertIsNot(None, self.db._replica_uid) self.assertEqual(32, len(self.db._replica_uid)) int(self.db._replica_uid, 16) def test__parse_index(self): - self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + self.db = SQLCipherDatabase(':memory:', PASSWORD) g = self.db._parse_index_definition('fieldname') self.assertIsInstance(g, query_parser.ExtractField) self.assertEqual(['fieldname'], g.field) def test__update_indexes(self): - self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + self.db = SQLCipherDatabase(':memory:', PASSWORD) g = self.db._parse_index_definition('fieldname') c = self.db._get_sqlite_handle().cursor() self.db._update_indexes('doc-id', {'fieldname': 'val'}, @@ -157,7 +202,7 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): def test__set_replica_uid(self): # Start from scratch, so that replica_uid isn't set. - self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) + self.db = SQLCipherDatabase(':memory:', PASSWORD) self.assertIsNot(None, self.db._real_replica_uid) self.assertIsNot(None, self.db._replica_uid) self.db._set_replica_uid('foo') @@ -172,30 +217,30 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): def test__open_database(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/test.sqlite' - sqlite_backend.SQLCipherDatabase(path, PASSWORD) - db2 = sqlite_backend.SQLCipherDatabase._open_database(path, PASSWORD) - self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) + SQLCipherDatabase(path, PASSWORD) + db2 = SQLCipherDatabase._open_database(path, PASSWORD) + self.assertIsInstance(db2, SQLCipherDatabase) def test__open_database_with_factory(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/test.sqlite' - sqlite_backend.SQLCipherDatabase(path, PASSWORD) - db2 = sqlite_backend.SQLCipherDatabase._open_database( + SQLCipherDatabase(path, PASSWORD) + db2 = SQLCipherDatabase._open_database( path, PASSWORD, document_factory=TestAlternativeDocument) self.assertEqual(TestAlternativeDocument, db2._factory) def test_open_database_existing(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/existing.sqlite' - sqlite_backend.SQLCipherDatabase(path, PASSWORD) - db2 = sqlite_backend.SQLCipherDatabase.open_database(path, PASSWORD, create=False) - self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) + SQLCipherDatabase(path, PASSWORD) + db2 = SQLCipherDatabase.open_database(path, PASSWORD, create=False) + self.assertIsInstance(db2, SQLCipherDatabase) def test_open_database_with_factory(self): temp_dir = self.createTempDir(prefix='u1db-test-') path = temp_dir + '/existing.sqlite' - sqlite_backend.SQLCipherDatabase(path, PASSWORD) - db2 = sqlite_backend.SQLCipherDatabase.open_database( + SQLCipherDatabase(path, PASSWORD) + db2 = SQLCipherDatabase.open_database( path, PASSWORD, create=False, document_factory=TestAlternativeDocument) self.assertEqual(TestAlternativeDocument, db2._factory) @@ -229,7 +274,7 @@ class SQLCipherOpen(TestU1DBOpen): db = u1db_open(self.db_path, password=PASSWORD, create=True) self.addCleanup(db.close) self.assertTrue(os.path.exists(self.db_path)) - self.assertIsInstance(db, sqlite_backend.SQLCipherDatabase) + self.assertIsInstance(db, SQLCipherDatabase) def test_open_with_factory(self): db = u1db_open(self.db_path, password=PASSWORD, create=True, @@ -238,7 +283,7 @@ class SQLCipherOpen(TestU1DBOpen): self.assertEqual(TestAlternativeDocument, db._factory) def test_open_existing(self): - db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) + db = SQLCipherDatabase(self.db_path, PASSWORD) self.addCleanup(db.close) doc = db.create_doc_from_json(tests.simple_doc) # Even though create=True, we shouldn't wipe the db @@ -248,8 +293,8 @@ class SQLCipherOpen(TestU1DBOpen): self.assertEqual(doc, doc2) def test_open_existing_no_create(self): - db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) + db = SQLCipherDatabase(self.db_path, PASSWORD) self.addCleanup(db.close) db2 = u1db_open(self.db_path, password=PASSWORD, create=False) self.addCleanup(db2.close) - self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) + self.assertIsInstance(db2, SQLCipherDatabase) -- cgit v1.2.3