summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--backends/sqlcipher.py38
-rw-r--r--tests/test_sqlcipher.py59
2 files changed, 79 insertions, 18 deletions
diff --git a/backends/sqlcipher.py b/backends/sqlcipher.py
index 53758397..6711aa86 100644
--- a/backends/sqlcipher.py
+++ b/backends/sqlcipher.py
@@ -16,25 +16,15 @@
"""A U1DB implementation that uses SQLCipher as its persistence layer."""
-import errno
import os
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-from sqlite3 import dbapi2
-import sys
+from sqlite3 import dbapi2, DatabaseError
import time
-import uuid
-from u1db.backends import CommonBackend
from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase
from u1db import (
Document,
errors,
- query_parser,
- vectorclock,
- )
+)
def open(path, password, create, document_factory=None):
@@ -50,11 +40,17 @@ def open(path, password, create, document_factory=None):
parameters as Document.__init__.
:return: An instance of Database.
"""
- from u1db.backends import sqlite_backend
return SQLCipherDatabase.open_database(
path, password, create=create, document_factory=document_factory)
+class DatabaseIsNotEncrypted(Exception):
+ """
+ Exception raised when trying to open non-encrypted databases.
+ """
+ pass
+
+
class SQLCipherDatabase(SQLitePartialExpandDatabase):
"""A U1DB implementation that uses SQLCipher as its persistence layer."""
@@ -67,12 +63,26 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):
def __init__(self, sqlite_file, password, document_factory=None):
"""Create a new sqlcipher file."""
+ self._check_if_db_is_encrypted(sqlite_file)
self._db_handle = dbapi2.connect(sqlite_file)
SQLCipherDatabase.set_pragma_key(self._db_handle, password)
self._real_replica_uid = None
self._ensure_schema()
self._factory = document_factory or Document
+ def _check_if_db_is_encrypted(self, sqlite_file):
+ if not os.path.exists(sqlite_file):
+ return
+ else:
+ try:
+ # try to open an encrypted database with the regular u1db backend
+ # should raise a DatabaseError exception.
+ SQLitePartialExpandDatabase(sqlite_file)
+ raise DatabaseIsNotEncrypted()
+ except DatabaseError:
+ pass
+
+
@classmethod
def _open_database(cls, sqlite_file, password, document_factory=None):
if not os.path.isfile(sqlite_file):
@@ -122,4 +132,4 @@ class SQLCipherDatabase(SQLitePartialExpandDatabase):
SQLCipherDatabase._sqlite_registry[klass._index_storage_value] = klass
-SQLCipherDatabase.register_implementation(SQLCipherDatabase)
+SQLCipherDatabase.register_implementation(SQLCipherDatabase) \ No newline at end of file
diff --git a/tests/test_sqlcipher.py b/tests/test_sqlcipher.py
index 870f9e5d..4457db53 100644
--- a/tests/test_sqlcipher.py
+++ b/tests/test_sqlcipher.py
@@ -2,7 +2,7 @@
import os
import time
-from sqlite3 import dbapi2
+from sqlite3 import dbapi2, DatabaseError
import unittest2 as unittest
from StringIO import StringIO
import threading
@@ -12,10 +12,13 @@ from u1db import (
errors,
query_parser,
)
-from u1db.backends.sqlite_backend import SQLiteDatabase
+from u1db.backends.sqlite_backend import SQLitePartialExpandDatabase
# soledad stuff.
-from leap.soledad.backends.sqlcipher import SQLCipherDatabase
+from leap.soledad.backends.sqlcipher import (
+ SQLCipherDatabase,
+ DatabaseIsNotEncrypted,
+)
from leap.soledad.backends.sqlcipher import open as u1db_open
# u1db tests stuff.
@@ -130,7 +133,7 @@ class TestSQLCipherDatabase(TestSQLiteDatabase):
t2 = None # will be a thread
- class SQLCipherDatabaseTesting(SQLiteDatabase):
+ class SQLCipherDatabaseTesting(SQLitePartialExpandDatabase):
_index_storage_value = "testing"
def __init__(self, dbname, ntry):
@@ -298,3 +301,51 @@ class SQLCipherOpen(TestU1DBOpen):
db2 = u1db_open(self.db_path, password=PASSWORD, create=False)
self.addCleanup(db2.close)
self.assertIsInstance(db2, SQLCipherDatabase)
+
+#-----------------------------------------------------------------------------
+# Tests for actual encryption of the database
+#-----------------------------------------------------------------------------
+
+class SQLCipherEncryptionTest(unittest.TestCase):
+
+ DB_FILE = '/tmp/test.db'
+
+ def delete_dbfiles(self):
+ for dbfile in [self.DB_FILE]:
+ if os.path.exists(dbfile):
+ os.unlink(dbfile)
+
+ def setUp(self):
+ self.delete_dbfiles()
+
+ def tearDown(self):
+ self.delete_dbfiles()
+
+ def test_try_to_open_encrypted_db_with_sqlite_backend(self):
+ db = SQLCipherDatabase(self.DB_FILE, PASSWORD)
+ doc = db.create_doc_from_json(tests.simple_doc)
+ db.close()
+ try:
+ # trying to open an encrypted database with the regular u1db backend
+ # should raise a DatabaseError exception.
+ SQLitePartialExpandDatabase(self.DB_FILE)
+ raise DatabaseIsNotEncrypted()
+ except DatabaseError:
+ # at this point we know that the regular U1DB sqlcipher backend
+ # did not succeed on opening the database, so it was indeed
+ # encrypted.
+ db = SQLCipherDatabase(self.DB_FILE, PASSWORD)
+ doc = db.get_doc(doc.doc_id)
+ self.assertEqual(tests.simple_doc, doc.get_json(), 'decrypted content mismatch')
+
+ def test_try_to_open_raw_db_with_sqlcipher_backend(self):
+ db = SQLitePartialExpandDatabase(self.DB_FILE)
+ db.create_doc_from_json(tests.simple_doc)
+ db.close()
+ try:
+ # trying to open the a non-encrypted database with sqlcipher backend
+ # should raise a DatabaseIsNotEncrypted exception.
+ SQLCipherDatabase(self.DB_FILE, PASSWORD)
+ raise DatabaseError("SQLCipher backend should not be able to open non-encrypted dbs.")
+ except DatabaseIsNotEncrypted:
+ pass \ No newline at end of file