diff options
| author | drebs <drebs@leap.se> | 2013-01-17 19:20:04 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-01-17 19:20:04 -0200 | 
| commit | 69173d511a99126fe6508d87e8a25a60d3f1f927 (patch) | |
| tree | a16630063b05fff5a9418c76c0f15a586b5ad63c /src | |
| parent | 14e749dbc9012cb66a94f96bd42f3cfa5ca836bd (diff) | |
Add tests for verifying if sqlcipher db is encrypted.
Diffstat (limited to 'src')
| -rw-r--r-- | src/leap/soledad/backends/sqlcipher.py | 38 | ||||
| -rw-r--r-- | src/leap/soledad/tests/test_sqlcipher.py | 59 | 
2 files changed, 79 insertions, 18 deletions
| diff --git a/src/leap/soledad/backends/sqlcipher.py b/src/leap/soledad/backends/sqlcipher.py index 53758397..6711aa86 100644 --- a/src/leap/soledad/backends/sqlcipher.py +++ b/src/leap/soledad/backends/sqlcipher.py @@ -16,25 +16,15 @@  """A U1DB implementation that uses SQLCipher as its persistence layer.""" -import errno  import os -try: -    import simplejson as json -except ImportError: -    import json  # noqa -from sqlite3 import dbapi2 -import sys +from sqlite3 import dbapi2, DatabaseError  import time -import uuid -from u1db.backends import CommonBackend  from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase  from u1db import (      Document,      errors, -    query_parser, -    vectorclock, -    ) +)  def open(path, password, create, document_factory=None): @@ -50,11 +40,17 @@ def open(path, password, create, document_factory=None):          parameters as Document.__init__.      :return: An instance of Database.      """ -    from u1db.backends import sqlite_backend      return SQLCipherDatabase.open_database(          path, password, create=create, document_factory=document_factory) +class DatabaseIsNotEncrypted(Exception): +    """ +    Exception raised when trying to open non-encrypted databases. +    """ +    pass + +  class SQLCipherDatabase(SQLitePartialExpandDatabase):      """A U1DB implementation that uses SQLCipher as its persistence layer.""" @@ -67,12 +63,26 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):      def __init__(self, sqlite_file, password, document_factory=None):          """Create a new sqlcipher file.""" +        self._check_if_db_is_encrypted(sqlite_file)          self._db_handle = dbapi2.connect(sqlite_file)          SQLCipherDatabase.set_pragma_key(self._db_handle, password)          self._real_replica_uid = None          self._ensure_schema()          self._factory = document_factory or Document +    def _check_if_db_is_encrypted(self, sqlite_file): +        if not os.path.exists(sqlite_file): +            return +        else: +            try: +                # try to open an encrypted database with the regular u1db backend +                # should raise a DatabaseError exception. +                SQLitePartialExpandDatabase(sqlite_file) +                raise DatabaseIsNotEncrypted() +            except DatabaseError: +                pass + +      @classmethod      def _open_database(cls, sqlite_file, password, document_factory=None):          if not os.path.isfile(sqlite_file): @@ -122,4 +132,4 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):          SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass -SQLCipherDatabase.register_implementation(SQLCipherDatabase) +SQLCipherDatabase.register_implementation(SQLCipherDatabase)
\ No newline at end of file diff --git a/src/leap/soledad/tests/test_sqlcipher.py b/src/leap/soledad/tests/test_sqlcipher.py index 870f9e5d..4457db53 100644 --- a/src/leap/soledad/tests/test_sqlcipher.py +++ b/src/leap/soledad/tests/test_sqlcipher.py @@ -2,7 +2,7 @@  import os  import time -from sqlite3 import dbapi2 +from sqlite3 import dbapi2, DatabaseError  import unittest2 as unittest  from StringIO import StringIO  import threading @@ -12,10 +12,13 @@ from u1db import (      errors,      query_parser,      ) -from u1db.backends.sqlite_backend import SQLiteDatabase +from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase  # soledad stuff. -from leap.soledad.backends.sqlcipher import SQLCipherDatabase +from leap.soledad.backends.sqlcipher import ( +    SQLCipherDatabase, +    DatabaseIsNotEncrypted, +)  from leap.soledad.backends.sqlcipher import open as u1db_open  # u1db tests stuff. @@ -130,7 +133,7 @@ class TestSQLCipherDatabase(TestSQLiteDatabase):          t2 = None  # will be a thread -        class SQLCipherDatabaseTesting(SQLiteDatabase): +        class SQLCipherDatabaseTesting(SQLitePartialExpandDatabase):              _index_storage_value = "testing"              def __init__(self, dbname, ntry): @@ -298,3 +301,51 @@ class SQLCipherOpen(TestU1DBOpen):          db2 = u1db_open(self.db_path, password=PASSWORD, create=False)          self.addCleanup(db2.close)          self.assertIsInstance(db2, SQLCipherDatabase) + +#----------------------------------------------------------------------------- +# Tests for actual encryption of the database +#----------------------------------------------------------------------------- + +class SQLCipherEncryptionTest(unittest.TestCase): + +    DB_FILE = '/tmp/test.db' + +    def delete_dbfiles(self): +        for dbfile in [self.DB_FILE]: +            if os.path.exists(dbfile): +                os.unlink(dbfile) + +    def setUp(self): +        self.delete_dbfiles() + +    def tearDown(self): +        self.delete_dbfiles() + +    def test_try_to_open_encrypted_db_with_sqlite_backend(self): +        db = SQLCipherDatabase(self.DB_FILE, PASSWORD) +        doc = db.create_doc_from_json(tests.simple_doc) +        db.close() +        try: +            # trying to open an encrypted database with the regular u1db backend +            # should raise a DatabaseError exception. +            SQLitePartialExpandDatabase(self.DB_FILE) +            raise DatabaseIsNotEncrypted() +        except DatabaseError: +            # at this point we know that the regular U1DB sqlcipher backend +            # did not succeed on opening the database, so it was indeed +            # encrypted. +            db = SQLCipherDatabase(self.DB_FILE, PASSWORD) +            doc = db.get_doc(doc.doc_id) +            self.assertEqual(tests.simple_doc, doc.get_json(), 'decrypted content mismatch') + +    def test_try_to_open_raw_db_with_sqlcipher_backend(self): +        db = SQLitePartialExpandDatabase(self.DB_FILE) +        db.create_doc_from_json(tests.simple_doc) +        db.close() +        try: +            # trying to open the a non-encrypted database with sqlcipher backend +            # should raise a DatabaseIsNotEncrypted exception. +            SQLCipherDatabase(self.DB_FILE, PASSWORD) +            raise DatabaseError("SQLCipher backend should not be able to open non-encrypted dbs.") +        except DatabaseIsNotEncrypted: +            pass
\ No newline at end of file | 
