diff options
| author | drebs <drebs@leap.se> | 2013-01-17 15:37:32 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-01-17 15:37:32 -0200 | 
| commit | 14e749dbc9012cb66a94f96bd42f3cfa5ca836bd (patch) | |
| tree | 51811ccddb07a0c8e2bceca31df38cb0d71fca35 | |
| parent | 8ee7ba49fd3ae902fd0a9d8a3a80b7b6a9ab999b (diff) | |
Include missing u1db sqlite test for soledad sqlcipher backend.
| -rw-r--r-- | src/leap/soledad/tests/test_sqlcipher.py | 101 | 
1 files changed, 73 insertions, 28 deletions
diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py index c896af91..870f9e5d 100644 --- a/src/leap/soledad/tests/test_sqlcipher.py +++ b/src/leap/soledad/tests/test_sqlcipher.py @@ -2,21 +2,21 @@  import os  import time -import threading -import unittest2 as unittest -  from sqlite3 import dbapi2 +import unittest2 as unittest +from StringIO import StringIO +import threading  # u1db stuff.  from u1db import (      errors,      query_parser,      ) +from u1db.backends.sqlite_backend import SQLiteDatabase  # soledad stuff. -from leap.soledad.backends import sqlcipher as sqlite_backend +from leap.soledad.backends.sqlcipher import SQLCipherDatabase  from leap.soledad.backends.sqlcipher import open as u1db_open -from leap.soledad.backends.leap_backend import LeapDocument  # u1db tests stuff.  from leap.soledad.tests import u1db_tests as tests @@ -46,7 +46,7 @@ PASSWORD = '123456'  class TestSQLCipherBackendImpl(tests.TestCase):      def test__allocate_doc_id(self): -        db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        db = SQLCipherDatabase(':memory:', PASSWORD)          doc_id1 = db._allocate_doc_id()          self.assertTrue(doc_id1.startswith('D-'))          self.assertEqual(34, len(doc_id1)) @@ -59,7 +59,7 @@ class TestSQLCipherBackendImpl(tests.TestCase):  #-----------------------------------------------------------------------------  def make_sqlcipher_database_for_test(test, replica_uid): -    db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +    db = SQLCipherDatabase(':memory:', PASSWORD)      db._set_replica_uid(replica_uid)      return db @@ -70,7 +70,7 @@ def copy_sqlcipher_database_for_test(test, db):      # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN      # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR      # HOUSE. -    new_db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +    new_db = SQLCipherDatabase(':memory:', PASSWORD)      tmpfile = StringIO()      for line in db._db_handle.iterdump():          if not 'sqlite_sequence' in line:  # work around bug in iterdump @@ -122,7 +122,52 @@ load_tests = tests.load_with_scenarios  # The following tests come from `u1db.tests.test_sqlite_backend`.  #----------------------------------------------------------------------------- -class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase): +class TestSQLCipherDatabase(TestSQLiteDatabase): + +    def test_atomic_initialize(self): +        tmpdir = self.createTempDir() +        dbname = os.path.join(tmpdir, 'atomic.db') + +        t2 = None  # will be a thread + +        class SQLCipherDatabaseTesting(SQLiteDatabase): +            _index_storage_value = "testing" + +            def __init__(self, dbname, ntry): +                self._try = ntry +                self._is_initialized_invocations = 0 +                super(SQLCipherDatabaseTesting, self).__init__(dbname) + +            def _is_initialized(self, c): +                res = super(SQLCipherDatabaseTesting, self)._is_initialized(c) +                if self._try == 1: +                    self._is_initialized_invocations += 1 +                    if self._is_initialized_invocations == 2: +                        t2.start() +                        # hard to do better and have a generic test +                        time.sleep(0.05) +                return res + +        outcome2 = [] + +        def second_try(): +            try: +                db2 = SQLCipherDatabaseTesting(dbname, 2) +            except Exception, e: +                outcome2.append(e) +            else: +                outcome2.append(db2) + +        t2 = threading.Thread(target=second_try) +        db1 = SQLCipherDatabaseTesting(dbname, 1) +        t2.join() + +        self.assertIsInstance(outcome2[0], SQLCipherDatabaseTesting) +        db2 = outcome2[0] +        self.assertTrue(db2._is_initialized(db1._get_sqlite_handle().cursor())) + + +class TestSQLCipherPartialExpandDatabase(TestSQLitePartialExpandDatabase):      # The following tests had to be cloned from u1db because they all      # instantiate the backend directly, so we need to change that in order to @@ -130,23 +175,23 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):      def setUp(self):          super(TestSQLitePartialExpandDatabase, self).setUp() -        self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        self.db = SQLCipherDatabase(':memory:', PASSWORD)          self.db._set_replica_uid('test')      def test_default_replica_uid(self): -        self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        self.db = SQLCipherDatabase(':memory:', PASSWORD)          self.assertIsNot(None, self.db._replica_uid)          self.assertEqual(32, len(self.db._replica_uid))          int(self.db._replica_uid, 16)      def test__parse_index(self): -        self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        self.db = SQLCipherDatabase(':memory:', PASSWORD)          g = self.db._parse_index_definition('fieldname')          self.assertIsInstance(g, query_parser.ExtractField)          self.assertEqual(['fieldname'], g.field)      def test__update_indexes(self): -        self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        self.db = SQLCipherDatabase(':memory:', PASSWORD)          g = self.db._parse_index_definition('fieldname')          c = self.db._get_sqlite_handle().cursor()          self.db._update_indexes('doc-id', {'fieldname': 'val'}, @@ -157,7 +202,7 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):      def test__set_replica_uid(self):          # Start from scratch, so that replica_uid isn't set. -        self.db = sqlite_backend.SQLCipherDatabase(':memory:', PASSWORD) +        self.db = SQLCipherDatabase(':memory:', PASSWORD)          self.assertIsNot(None, self.db._real_replica_uid)          self.assertIsNot(None, self.db._replica_uid)          self.db._set_replica_uid('foo') @@ -172,30 +217,30 @@ class TestSQLCipherDatabase(TestSQLitePartialExpandDatabase):      def test__open_database(self):          temp_dir = self.createTempDir(prefix='u1db-test-')          path = temp_dir + '/test.sqlite' -        sqlite_backend.SQLCipherDatabase(path, PASSWORD) -        db2 = sqlite_backend.SQLCipherDatabase._open_database(path, PASSWORD) -        self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) +        SQLCipherDatabase(path, PASSWORD) +        db2 = SQLCipherDatabase._open_database(path, PASSWORD) +        self.assertIsInstance(db2, SQLCipherDatabase)      def test__open_database_with_factory(self):          temp_dir = self.createTempDir(prefix='u1db-test-')          path = temp_dir + '/test.sqlite' -        sqlite_backend.SQLCipherDatabase(path, PASSWORD) -        db2 = sqlite_backend.SQLCipherDatabase._open_database( +        SQLCipherDatabase(path, PASSWORD) +        db2 = SQLCipherDatabase._open_database(              path, PASSWORD, document_factory=TestAlternativeDocument)          self.assertEqual(TestAlternativeDocument, db2._factory)      def test_open_database_existing(self):          temp_dir = self.createTempDir(prefix='u1db-test-')          path = temp_dir + '/existing.sqlite' -        sqlite_backend.SQLCipherDatabase(path, PASSWORD) -        db2 = sqlite_backend.SQLCipherDatabase.open_database(path, PASSWORD, create=False) -        self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) +        SQLCipherDatabase(path, PASSWORD) +        db2 = SQLCipherDatabase.open_database(path, PASSWORD, create=False) +        self.assertIsInstance(db2, SQLCipherDatabase)      def test_open_database_with_factory(self):          temp_dir = self.createTempDir(prefix='u1db-test-')          path = temp_dir + '/existing.sqlite' -        sqlite_backend.SQLCipherDatabase(path, PASSWORD) -        db2 = sqlite_backend.SQLCipherDatabase.open_database( +        SQLCipherDatabase(path, PASSWORD) +        db2 = SQLCipherDatabase.open_database(              path, PASSWORD, create=False, document_factory=TestAlternativeDocument)          self.assertEqual(TestAlternativeDocument, db2._factory) @@ -229,7 +274,7 @@ class SQLCipherOpen(TestU1DBOpen):          db = u1db_open(self.db_path, password=PASSWORD, create=True)          self.addCleanup(db.close)          self.assertTrue(os.path.exists(self.db_path)) -        self.assertIsInstance(db, sqlite_backend.SQLCipherDatabase) +        self.assertIsInstance(db, SQLCipherDatabase)      def test_open_with_factory(self):          db = u1db_open(self.db_path, password=PASSWORD, create=True, @@ -238,7 +283,7 @@ class SQLCipherOpen(TestU1DBOpen):          self.assertEqual(TestAlternativeDocument, db._factory)      def test_open_existing(self): -        db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) +        db = SQLCipherDatabase(self.db_path, PASSWORD)          self.addCleanup(db.close)          doc = db.create_doc_from_json(tests.simple_doc)          # Even though create=True, we shouldn't wipe the db @@ -248,8 +293,8 @@ class SQLCipherOpen(TestU1DBOpen):          self.assertEqual(doc, doc2)      def test_open_existing_no_create(self): -        db = sqlite_backend.SQLCipherDatabase(self.db_path, PASSWORD) +        db = SQLCipherDatabase(self.db_path, PASSWORD)          self.addCleanup(db.close)          db2 = u1db_open(self.db_path, password=PASSWORD, create=False)          self.addCleanup(db2.close) -        self.assertIsInstance(db2, sqlite_backend.SQLCipherDatabase) +        self.assertIsInstance(db2, SQLCipherDatabase)  | 
