diff options
author | drebs <drebs@leap.se> | 2013-01-17 19:20:04 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2013-01-17 19:20:04 -0200 |
commit | c6d0e291653919cb529d5417bff28f602006b33a (patch) | |
tree | ac6291a89cfc4d42bcba50d80afcc9be3d51ab74 | |
parent | 25065cedf2b36983a7d16480500cdf0cb80deba6 (diff) |
Add tests for verifying if sqlcipher db is encrypted.
-rw-r--r-- | backends/sqlcipher.py | 38 | ||||
-rw-r--r-- | tests/test_sqlcipher.py | 59 |
2 files changed, 79 insertions, 18 deletions
diff --git a/backends/sqlcipher.py b/backends/sqlcipher.py index 53758397..6711aa86 100644 --- a/backends/sqlcipher.py +++ b/backends/sqlcipher.py @@ -16,25 +16,15 @@ """A U1DB implementation that uses SQLCipher as its persistence layer.""" -import errno import os -try: - import simplejson as json -except ImportError: - import json # noqa -from sqlite3 import dbapi2 -import sys +from sqlite3 import dbapi2, DatabaseError import time -import uuid -from u1db.backends import CommonBackend from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase from u1db import ( Document, errors, - query_parser, - vectorclock, - ) +) def open(path, password, create, document_factory=None): @@ -50,11 +40,17 @@ def open(path, password, create, document_factory=None): parameters as Document.__init__. :return: An instance of Database. """ - from u1db.backends import sqlite_backend return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory) +class DatabaseIsNotEncrypted(Exception): + """ + Exception raised when trying to open non-encrypted databases. + """ + pass + + class SQLCipherDatabase(SQLitePartialExpandDatabase): """A U1DB implementation that uses SQLCipher as its persistence layer.""" @@ -67,12 +63,26 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase): def __init__(self, sqlite_file, password, document_factory=None): """Create a new sqlcipher file.""" + self._check_if_db_is_encrypted(sqlite_file) self._db_handle = dbapi2.connect(sqlite_file) SQLCipherDatabase.set_pragma_key(self._db_handle, password) self._real_replica_uid = None self._ensure_schema() self._factory = document_factory or Document + def _check_if_db_is_encrypted(self, sqlite_file): + if not os.path.exists(sqlite_file): + return + else: + try: + # try to open an encrypted database with the regular u1db backend + # should raise a DatabaseError exception. + SQLitePartialExpandDatabase(sqlite_file) + raise DatabaseIsNotEncrypted() + except DatabaseError: + pass + + @classmethod def _open_database(cls, sqlite_file, password, document_factory=None): if not os.path.isfile(sqlite_file): @@ -122,4 +132,4 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase): SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass -SQLCipherDatabase.register_implementation(SQLCipherDatabase) +SQLCipherDatabase.register_implementation(SQLCipherDatabase)
\ No newline at end of file diff --git a/tests/test_sqlcipher.py b/tests/test_sqlcipher.py index 870f9e5d..4457db53 100644 --- a/tests/test_sqlcipher.py +++ b/tests/test_sqlcipher.py @@ -2,7 +2,7 @@ import os import time -from sqlite3 import dbapi2 +from sqlite3 import dbapi2, DatabaseError import unittest2 as unittest from StringIO import StringIO import threading @@ -12,10 +12,13 @@ from u1db import ( errors, query_parser, ) -from u1db.backends.sqlite_backend import SQLiteDatabase +from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase # soledad stuff. -from leap.soledad.backends.sqlcipher import SQLCipherDatabase +from leap.soledad.backends.sqlcipher import ( + SQLCipherDatabase, + DatabaseIsNotEncrypted, +) from leap.soledad.backends.sqlcipher import open as u1db_open # u1db tests stuff. @@ -130,7 +133,7 @@ class TestSQLCipherDatabase(TestSQLiteDatabase): t2 = None # will be a thread - class SQLCipherDatabaseTesting(SQLiteDatabase): + class SQLCipherDatabaseTesting(SQLitePartialExpandDatabase): _index_storage_value = "testing" def __init__(self, dbname, ntry): @@ -298,3 +301,51 @@ class SQLCipherOpen(TestU1DBOpen): db2 = u1db_open(self.db_path, password=PASSWORD, create=False) self.addCleanup(db2.close) self.assertIsInstance(db2, SQLCipherDatabase) + +#----------------------------------------------------------------------------- +# Tests for actual encryption of the database +#----------------------------------------------------------------------------- + +class SQLCipherEncryptionTest(unittest.TestCase): + + DB_FILE = '/tmp/test.db' + + def delete_dbfiles(self): + for dbfile in [self.DB_FILE]: + if os.path.exists(dbfile): + os.unlink(dbfile) + + def setUp(self): + self.delete_dbfiles() + + def tearDown(self): + self.delete_dbfiles() + + def test_try_to_open_encrypted_db_with_sqlite_backend(self): + db = SQLCipherDatabase(self.DB_FILE, PASSWORD) + doc = db.create_doc_from_json(tests.simple_doc) + db.close() + try: + # trying to open an encrypted database with the regular u1db backend + # should raise a DatabaseError exception. + SQLitePartialExpandDatabase(self.DB_FILE) + raise DatabaseIsNotEncrypted() + except DatabaseError: + # at this point we know that the regular U1DB sqlcipher backend + # did not succeed on opening the database, so it was indeed + # encrypted. + db = SQLCipherDatabase(self.DB_FILE, PASSWORD) + doc = db.get_doc(doc.doc_id) + self.assertEqual(tests.simple_doc, doc.get_json(), 'decrypted content mismatch') + + def test_try_to_open_raw_db_with_sqlcipher_backend(self): + db = SQLitePartialExpandDatabase(self.DB_FILE) + db.create_doc_from_json(tests.simple_doc) + db.close() + try: + # trying to open the a non-encrypted database with sqlcipher backend + # should raise a DatabaseIsNotEncrypted exception. + SQLCipherDatabase(self.DB_FILE, PASSWORD) + raise DatabaseError("SQLCipher backend should not be able to open non-encrypted dbs.") + except DatabaseIsNotEncrypted: + pass
\ No newline at end of file |