summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sqlcipher.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/sqlcipher.py')
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py292
1 files changed, 102 insertions, 190 deletions
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index db3cb5cb..8e7d39c2 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -42,7 +42,6 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
-import multiprocessing
import os
import threading
import json
@@ -54,19 +53,17 @@ from u1db.backends import sqlite_backend
from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
-from httplib import CannotSendRequest
+from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
-from twisted.python import log
+from twisted.enterprise import adbapi
-from leap.soledad.client import crypto
-from leap.soledad.client.target import SoledadSyncTarget
-from leap.soledad.client.target import PendingReceivedDocsSyncError
+from leap.soledad.client import encdecpool
+from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.client import pragmas
@@ -102,46 +99,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
conn = sqlcipher_dbapi2.connect(
opts.path, check_same_thread=check_same_thread)
- set_init_pragmas(conn, opts, extra_queries=on_init)
+ pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
-_db_init_lock = threading.Lock()
-
-
-def set_init_pragmas(conn, opts=None, extra_queries=None):
- """
- Set the initialization pragmas.
-
- This includes the crypto pragmas, and any other options that must
- be passed early to sqlcipher db.
- """
- soledad_assert(opts is not None)
- extra_queries = [] if extra_queries is None else extra_queries
- with _db_init_lock:
- # only one execution path should initialize the db
- _set_init_pragmas(conn, opts, extra_queries)
-
-
-def _set_init_pragmas(conn, opts, extra_queries):
-
- sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
- memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
- nowal = os.environ.get('LEAP_SQLITE_NOWAL')
-
- pragmas.set_crypto_pragmas(conn, opts)
-
- if not nowal:
- pragmas.set_write_ahead_logging(conn)
- if sync_off:
- pragmas.set_synchronous_off(conn)
- else:
- pragmas.set_synchronous_normal(conn)
- if memstore:
- pragmas.set_mem_temp_store(conn)
-
- for query in extra_queries:
- conn.cursor().execute(query)
+def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
+ from leap.soledad.client import sqlcipher_adbapi
+ return sqlcipher_adbapi.getConnectionPool(
+ opts, extra_queries=extra_queries)
class SQLCipherOptions(object):
@@ -151,22 +116,32 @@ class SQLCipherOptions(object):
@classmethod
def copy(cls, source, path=None, key=None, create=None,
- is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None,
- defer_encryption=None, sync_db_key=None):
+ is_raw_key=None, cipher=None, kdf_iter=None,
+ cipher_page_size=None, defer_encryption=None, sync_db_key=None):
"""
Return a copy of C{source} with parameters different than None
replaced by new values.
"""
- return SQLCipherOptions(
- path if path else source.path,
- key if key else source.key,
- create=create if create else source.create,
- is_raw_key=is_raw_key if is_raw_key else source.is_raw_key,
- cipher=cipher if cipher else source.cipher,
- kdf_iter=kdf_iter if kdf_iter else source.kdf_iter,
- cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size,
- defer_encryption=defer_encryption if defer_encryption else source.defer_encryption,
- sync_db_key=sync_db_key if sync_db_key else source.sync_db_key)
+ local_vars = locals()
+ args = []
+ kwargs = {}
+
+ for name in ["path", "key"]:
+ val = local_vars[name]
+ if val is not None:
+ args.append(val)
+ else:
+ args.append(getattr(source, name))
+
+ for name in ["create", "is_raw_key", "cipher", "kdf_iter",
+ "cipher_page_size", "defer_encryption", "sync_db_key"]:
+ val = local_vars[name]
+ if val is not None:
+ kwargs[name] = val
+ else:
+ kwargs[name] = getattr(source, name)
+
+ return SQLCipherOptions(*args, **kwargs)
def __init__(self, path, key, create=True, is_raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
@@ -307,10 +282,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: str
"""
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
-
- # TODO XXX move to API XXX
if self.defer_encryption:
- self.sync_queue.put_nowait(doc)
+ # TODO move to api?
+ self._sync_enc_pool.enqueue_doc_for_encryption(doc)
return doc_rev
#
@@ -440,7 +414,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
Soledad syncer implementation.
"""
- _sync_loop = None
_sync_enc_pool = None
"""
@@ -450,13 +423,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'
"""
- A dictionary that hold locks which avoid multiple sync attempts from the
- same database replica.
- """
- # XXX We do not need the lock here now. Remove.
- encrypting_lock = threading.Lock()
-
- """
Period or recurrence of the Looping Call that will do the encryption to the
syncdb (in seconds).
"""
@@ -468,19 +434,18 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
syncing_lock = defaultdict(threading.Lock)
- def __init__(self, opts, soledad_crypto, replica_uid,
+ def __init__(self, opts, soledad_crypto, replica_uid, cert_file,
defer_encryption=False):
self._opts = opts
self._path = opts.path
self._crypto = soledad_crypto
self.__replica_uid = replica_uid
+ self._cert_file = cert_file
self._sync_db_key = opts.sync_db_key
self._sync_db = None
- self._sync_db_write_lock = None
self._sync_enc_pool = None
- self.sync_queue = None
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
@@ -490,8 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self._sync_db_write_lock = threading.Lock()
- self.sync_queue = multiprocessing.Queue()
self.running = False
self._sync_threadpool = None
@@ -503,25 +466,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._db_handle = None
self._initialize_main_db()
- if defer_encryption:
- self._initialize_sync_db(opts)
+ # the sync_db is used both for deferred encryption and decryption, so
+ # we want to initialize it anyway to allow for all combinations of
+ # deferred encryption and decryption configurations.
+ self._initialize_sync_db(opts)
+ if defer_encryption:
# initialize syncing queue encryption pool
- self._sync_enc_pool = crypto.SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
-
- # -----------------------------------------------------------------
- # From the documentation: If f returns a deferred, rescheduling
- # will not take place until the deferred has fired. The result
- # value is ignored.
-
- # TODO use this to avoid multiple sync attempts if the sync has not
- # finished!
- # -----------------------------------------------------------------
-
- # XXX this was called sync_watcher --- trace any remnants
- self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
- self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
+ self._sync_enc_pool = encdecpool.SyncEncrypterPool(
+ self._crypto, self._sync_db)
self.shutdownID = None
@@ -584,11 +537,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# somewhere else
sync_opts = SQLCipherOptions.copy(
opts, path=sync_db_path, create=True)
- self._sync_db = initialize_sqlcipher_db(
- sync_opts, on_init=self._sync_db_extra_init,
- check_same_thread=False)
- pragmas.set_crypto_pragmas(self._sync_db, opts)
- # ---------------------------------------------------------
+ self._sync_db = getConnectionPool(
+ sync_opts, extra_queries=self._sync_db_extra_init)
@property
def _sync_db_extra_init(self):
@@ -599,15 +549,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:rtype: tuple of strings
"""
maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
- encr = crypto.SyncEncrypterPool
- decr = crypto.SyncDecrypterPool
+ encr = encdecpool.SyncEncrypterPool
+ decr = encdecpool.SyncDecrypterPool
sql_encr_table_query = (maybe_create % (
encr.TABLE_NAME, encr.FIELD_NAMES))
sql_decr_table_query = (maybe_create % (
decr.TABLE_NAME, decr.FIELD_NAMES))
return (sql_encr_table_query, sql_decr_table_query)
- def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
+ def sync(self, url, creds=None, defer_decryption=True):
"""
Synchronize documents with remote replica exposed at url.
@@ -621,12 +571,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:param url: The url of the target replica to sync with.
:type url: str
- :param creds:
- optional dictionary giving credentials.
- to authorize the operation with the server.
+ :param creds: optional dictionary giving credentials to authorize the
+ operation with the server.
:type creds: dict
- :param autocreate: Ask the target to create the db if non-existent.
- :type autocreate: bool
:param defer_decryption:
Whether to defer the decryption process using the intermediate
database. If False, decryption will be done inline.
@@ -637,49 +584,11 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
before the synchronisation was performed.
:rtype: Deferred
"""
- kwargs = {'creds': creds, 'autocreate': autocreate,
- 'defer_decryption': defer_decryption}
- return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
-
- def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
- res = None
-
# the following context manager blocks until the syncing lock can be
# acquired.
- # TODO review, I think this is no longer needed with a 1-thread
- # threadpool.
-
- log.msg("in _sync")
- self.__url = url
with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
- try:
- log.msg('syncer sync...')
- res = syncer.sync(autocreate=autocreate,
- defer_decryption=defer_decryption)
-
- except PendingReceivedDocsSyncError:
- logger.warning("Local sync db is not clear, skipping sync...")
- return
- except CannotSendRequest:
- logger.warning("Connection with sync target couldn't be "
- "established. Resetting connection...")
- # closing the connection it will be recreated in the next try
- syncer.sync_target.close()
- return
-
- return res
-
- def stop_sync(self):
- """
- Interrupt all ongoing syncs.
- """
- self._stop_sync()
-
- def _stop_sync(self):
- for url in self._syncers:
- _, syncer = self._syncers[url]
- syncer.stop()
+ return syncer.sync(defer_decryption=defer_decryption)
@contextmanager
def _syncer(self, url, creds=None):
@@ -690,6 +599,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
one instance synchronizing the same database replica at the same time.
Because of that, this method blocks until the syncing lock can be
acquired.
+
+ :param creds: optional dictionary giving credentials to authorize the
+ operation with the server.
+ :type creds: dict
"""
with self.syncing_lock[self._path]:
syncer = self._get_syncer(url, creds=creds)
@@ -723,16 +636,17 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- wlock = self._sync_db_write_lock
syncer = SoledadSynchronizer(
self,
- SoledadSyncTarget(url,
- # XXX is the replica_uid ready?
- self._replica_uid,
- creds=creds,
- crypto=self._crypto,
- sync_db=self._sync_db,
- sync_db_write_lock=wlock))
+ SoledadHTTPSyncTarget(
+ url,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
+ creds=creds,
+ crypto=self._crypto,
+ cert_file=self._cert_file,
+ sync_db=self._sync_db,
+ sync_enc_pool=self._sync_enc_pool))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -744,34 +658,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# Symmetric encryption of syncing docs
#
- def _encrypt_syncing_docs(self):
- """
- Process the syncing queue and send the documents there
- to be encrypted in the sync db. They will be read by the
- SoledadSyncTarget during the sync_exchange.
-
- Called periodically from the LoopingCall self._sync_loop.
- """
- # TODO should return a deferred that would firewhen the encryption is
- # done. See note on __init__
-
- lock = self.encrypting_lock
- # optional wait flag used to avoid blocking
- if not lock.acquire(False):
- return
- else:
- queue = self.sync_queue
- try:
- while not queue.empty():
- doc = queue.get_nowait()
- self._sync_enc_pool.encrypt_doc(doc)
-
- except Exception as exc:
- logger.error("Error while encrypting docs to sync")
- logger.exception(exc)
- finally:
- lock.release()
-
def get_generation(self):
# FIXME
# XXX this SHOULD BE a callback
@@ -789,16 +675,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
"""
Close the syncer and syncdb orderly
"""
- # stop the sync loop for deferred encryption
- if self._sync_loop is not None:
- self._sync_loop.reset()
- self._sync_loop.stop()
- self._sync_loop = None
# close all open syncers
for url in self._syncers:
- _, syncer = self._syncers[url]
- syncer.close()
- self._syncers = []
+ del self._syncers[url]
+
# stop the encryption pool
if self._sync_enc_pool is not None:
self._sync_enc_pool.close()
@@ -808,11 +688,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if self._sync_db is not None:
self._sync_db.close()
self._sync_db = None
- # close the sync queue
- if self.sync_queue is not None:
- self.sync_queue.close()
- del self.sync_queue
- self.sync_queue = None
class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
@@ -903,3 +778,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
has_conflicts=has_conflicts, syncable=syncable)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
+
+
+#
+# twisted.enterprise.adbapi SQLCipher implementation
+#
+
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+
+def getConnectionPool(opts, extra_queries=None):
+ openfun = partial(
+ pragmas.set_init_pragmas,
+ opts=opts,
+ extra_queries=extra_queries)
+ return SQLCipherConnectionPool(
+ database=opts.path,
+ check_same_thread=False,
+ cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class SQLCipherConnection(adbapi.Connection):
+ pass
+
+
+class SQLCipherTransaction(adbapi.Transaction):
+ pass
+
+
+class SQLCipherConnectionPool(adbapi.ConnectionPool):
+
+ connectionFactory = SQLCipherConnection
+ transactionFactory = SQLCipherTransaction
+
+ def __init__(self, *args, **kwargs):
+ adbapi.ConnectionPool.__init__(
+ self, "pysqlcipher.dbapi2", *args, **kwargs)