diff options
Diffstat (limited to 'client/src/leap/soledad/client/sqlcipher.py')
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 246 |
1 files changed, 210 insertions, 36 deletions
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..46ceca42 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC when opening a database. So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging +import multiprocessing import os +import sqlite3 import string import threading import time @@ -57,9 +57,12 @@ from collections import defaultdict from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None def open(path, password, create=True, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """Open a database at the given location. Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None, :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: An instance of Database. :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None, return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, - cipher_page_size=cipher_page_size) + cipher_page_size=cipher_page_size, defer_encryption=defer_encryption) # @@ -147,19 +153,40 @@ class NotAnHexString(Exception): # class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): - """A U1DB implementation that uses SQLCipher as its persistence layer.""" + """ + A U1DB implementation that uses SQLCipher as its persistence layer. + """ + defer_encryption = False _index_storage_value = 'expand referenced encrypted' k_lock = threading.Lock() create_doc_lock = threading.Lock() update_indexes_lock = threading.Lock() + _sync_watcher = None + _sync_enc_pool = None + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - syncing_lock = defaultdict(threading.Lock) """ A dictionary that hold locks which avoid multiple sync attempts from the same database replica. """ + encrypting_lock = threading.Lock() + + """ + Period or recurrence of the periodic encrypting task, in seconds. + """ + ENCRYPT_TASK_PERIOD = 1 + syncing_lock = defaultdict(threading.Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.assert_db_is_encrypted( sqlcipher_file, password, raw_key, cipher, kdf_iter, cipher_page_size) - # connect to the database + + # connect to the sqlcipher database with self.k_lock: self._db_handle = dbapi2.connect( sqlcipher_file, @@ -215,6 +243,26 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._ensure_schema() self._crypto = crypto + self._sync_db = None + self._sync_db_write_lock = None + self._sync_enc_pool = None + + if self.defer_encryption: + if sqlcipher_file != ":memory:": + self._sync_db_path = "%s-sync" % sqlcipher_file + else: + self._sync_db_path = ":memory:" + + # initialize sync db + self._init_sync_db() + + # initialize syncing queue encryption pool + self._sync_enc_pool = SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + self._sync_watcher = TimerTask(self._encrypt_syncing_docs, + self.ENCRYPT_TASK_PERIOD) + self._sync_watcher.start() + def factory(doc_id=None, rev=None, json='{}', has_conflicts=False, syncable=True): return SoledadDocument(doc_id=doc_id, rev=rev, json=json, @@ -226,7 +274,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', - kdf_iter=4000, cipher_page_size=1024): + kdf_iter=4000, cipher_page_size=1024, + defer_encryption=False): """ Open a SQLCipher database. @@ -249,10 +298,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption if not os.path.isfile(sqlcipher_file): raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +351,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): def open_database(cls, sqlcipher_file, password, create, backend_cls=None, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """ Open a SQLCipher database. :param sqlcipher_file: The path for the SQLCipher file. :type sqlcipher_file: str + :param password: The password that protects the SQLCipher db. :type password: str + :param create: Should the datbase be created if it does not already - exist? - :type: bool + exist? + :type create: bool + :param backend_cls: A class to use as backend. :type backend_cls: type + :param document_factory: A function that will be called with the same - parameters as Document.__init__. + parameters as Document.__init__. :type document_factory: callable + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. + document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param raw_key: Whether C{password} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. + passphrase that should be hashed to obtain the + encyrption key. :type raw_key: bool + :param cipher: The cipher and mode to use. :type cipher: str + :param kdf_iter: The number of iterations to use. :type kdf_iter: int + :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption try: return cls._open_database( sqlcipher_file, password, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, - kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) + kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, + defer_encryption=defer_encryption) except u1db_errors.DatabaseDoesNotExist: if not create: raise @@ -347,7 +416,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) - def sync(self, url, creds=None, autocreate=True): + def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -362,6 +431,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :param autocreate: Ask the target to create the db if non-existent. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool :return: The local generation before the synchronisation was performed. :rtype: int @@ -370,7 +443,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # the following context manager blocks until the syncing lock can be # acquired. with self.syncer(url, creds=creds) as syncer: - res = syncer.sync(autocreate=autocreate) + + # XXX could mark the critical section here... + try: + res = syncer.sync(autocreate=autocreate, + defer_decryption=defer_decryption) + + except PendingReceivedDocsSyncError: + logger.warning("Local sync db is not clear, skipping sync...") + return + return res def stop_sync(self): @@ -394,7 +476,18 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: syncer = self._get_syncer(url, creds=creds) yield syncer - syncer.sync_target.close() + #syncer.sync_target.close() + + @property + def syncing(self): + syncing = False + for url in self._syncers: + _, _, lock = self._syncers[url] + is_not_locked = lock.acquire(blocking=False) + if is_not_locked is False: + return True + lock.release() + return False def _get_syncer(self, url, creds=None): """ @@ -415,11 +508,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - syncer = Synchronizer( + wlock = self._sync_db_write_lock + syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, + self._replica_uid, creds=creds, - crypto=self._crypto)) + crypto=self._crypto, + sync_db=self._sync_db, + sync_db_write_lock=wlock)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -442,21 +539,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') - def create_doc(self, content, doc_id=None): + def _init_sync_db(self): + """ + Initialize the Symmetrically-Encrypted document to be synced database, + and the queue to communicate with subprocess workers. """ - Create a new document in the local encrypted database. + self._sync_db = sqlite3.connect(self._sync_db_path, + check_same_thread=False) - :param content: the contents of the new document - :type content: dict - :param doc_id: an optional identifier specifying the document id - :type doc_id: str + self._sync_db_write_lock = threading.Lock() + self._create_sync_db_tables() + self.sync_queue = multiprocessing.Queue() + + def _create_sync_db_tables(self): + """ + Create tables for the local sync documents db if needed. + """ + encr = SyncEncrypterPool + decr = SyncDecrypterPool + sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + encr.TABLE_NAME, encr.FIELD_NAMES)) + sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + decr.TABLE_NAME, decr.FIELD_NAMES)) + + with self._sync_db_write_lock: + with self._sync_db: + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) + + # + # Symmetric encryption of syncing docs + # + + def _encrypt_syncing_docs(self): + """ + Process the syncing queue and send the documents there + to be encrypted in the sync db. They will be read by the + SoledadSyncTarget during the sync_exchange. + + Called periodical from the TimerTask self._sync_watcher. + """ + lock = self.encrypting_lock + # optional wait flag used to avoid blocking + if not lock.acquire(False): + return + else: + queue = self.sync_queue + try: + while not queue.empty(): + doc = queue.get_nowait() + self._sync_enc_pool.encrypt_doc(doc) + + except Exception as exc: + logger.error("Error while encrypting docs to sync") + logger.exception(exc) + finally: + lock.release() + + # + # Document operations + # - :return: the new document - :rtype: SoledadDocument + def put_doc(self, doc): """ - with self.create_doc_lock: - return sqlite_backend.SQLitePartialExpandDatabase.create_doc( - self, content, doc_id=doc_id) + Overwrite the put_doc method, to enqueue the modified document for + encryption before sync. + + :param doc: The document to be put. + :type doc: u1db.Document + + :return: The new document revision. + :rtype: str + """ + doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( + self, doc) + if self.defer_encryption: + self.sync_queue.put_nowait(doc) + return doc_rev + + # indexes def _put_and_update_indexes(self, old_doc, doc): """ @@ -906,12 +1067,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = c.fetchall() return res[0][0] - def __del__(self): + def close(self): """ - Closes db_handle upon object destruction. + Close db_handle and close syncer. """ + logger.debug("Sqlcipher backend: closing") + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.close() + if self._sync_enc_pool is not None: + self._sync_enc_pool.close() if self._db_handle is not None: self._db_handle.close() + @property + def replica_uid(self): + return self._get_replica_uid() + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) |