summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-05-04 13:04:56 -0300
committerdrebs <drebs@leap.se>2015-05-20 10:16:46 -0300
commit3a7ddacd06fd57afb10cc3d7083c2aa196c9328f (patch)
tree7c5dbb40a7f01e087101df08d7824afbd8abc27b
parentb75bedb065cfbbb2993659d867ef554ff70596ae (diff)
[feature] use async adbapi for async decryption
Since we started implementing twisted api in soledad, some pieces are missing. Accessing the sqlcipher database directly with the twisted adbapi facilities is one of them. The async encryption/decryption was touching the database directly, and this was causing some difficulties like having different threads accessing the same database. This commit implements the twisted adbapi stuff for the asynchronous encryption/decryption facilities. Next steps would be use async adbapi for async encryption and use async adbapi for all sqlcipher access.
-rw-r--r--client/changes/feature_use-twisted-adbapi-for-sync-db1
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/crypto.py190
-rw-r--r--client/src/leap/soledad/client/pragmas.py43
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py130
-rw-r--r--client/src/leap/soledad/client/target.py71
6 files changed, 233 insertions, 205 deletions
diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db
new file mode 100644
index 00000000..41e5e6e3
--- /dev/null
+++ b/client/changes/feature_use-twisted-adbapi-for-sync-db
@@ -0,0 +1 @@
+ o Use twisted.enterprise.adbapi for access to the sync database.
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 7ad10db5..5b882bbe 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject
from pysqlcipher.dbapi2 import OperationalError
from leap.soledad.client import sqlcipher as soledad_sqlcipher
+from leap.soledad.client.pragmas import set_init_pragmas
logger = logging.getLogger(name=__name__)
@@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
:rtype: U1DBConnectionPool
"""
if openfun is None and driver == "pysqlcipher":
- openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts)
+ openfun = partial(set_init_pragmas, opts=opts)
return U1DBConnectionPool(
"%s.dbapi2" % driver, database=opts.path,
check_same_thread=False, cp_openfun=openfun,
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 107bf7f1..dd40b198 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -25,11 +25,15 @@ import json
import logging
import multiprocessing
import threading
+import time
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
from zope.proxy import sameProxiedObjects
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
@@ -227,7 +231,7 @@ class SoledadCrypto(object):
#
def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
- mac_method, secret):
+ mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc):
def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
- enc_iv, mac_method, secret, doc_mac):
+ enc_iv, mac_method, secret, doc_mac):
"""
Verify that C{doc_mac} is a correct MAC for the given document.
@@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object):
"""
WORKERS = multiprocessing.cpu_count()
- def __init__(self, crypto, sync_db, write_lock):
+ def __init__(self, crypto, sync_db):
"""
Initialize the pool of encryption-workers.
@@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object):
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
self._sync_db = sync_db
- self._sync_db_write_lock = write_lock
def close(self):
"""
@@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# TODO implement throttling to reduce cpu usage??
WORKERS = multiprocessing.cpu_count()
TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id, rev, content"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
def encrypt_doc(self, doc, workers=True):
"""
@@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ @defer.inlineCallbacks
def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
@@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# FIXME --- callback should complete immediately since otherwise the
# thread which handles the results will get blocked
# Right now we're blocking the dispatcher with the writes to sqlite.
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
@@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
# TODO implement throttling to reduce cpu usage??
TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted"
+ FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted"
- write_encrypted_lock = threading.Lock()
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
def __init__(self, *args, **kwargs):
"""
@@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.source_replica_uid = None
self._async_results = []
- def set_source_replica_uid(self, source_replica_uid):
- """
- Set the source replica uid for this decrypter pool instance.
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
- """
- self.source_replica_uid = source_replica_uid
+ self._stopped = threading.Event()
+ self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished decryptor thread."))
+ @defer.inlineCallbacks
def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
"""
docstr = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, docstr, gen, trans_id, 1))
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
+ @defer.inlineCallbacks
def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
"""
Insert a document that is not symmetrically encrypted.
@@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
if not isinstance(content, str):
content = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
- self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id,))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, content, gen, trans_id, 0))
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, content, gen, trans_id, 0))
+ @defer.inlineCallbacks
def delete_received_doc(self, doc_id, doc_rev):
"""
Delete a received doc after it was inserted into the local db.
@@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, doc_rev))
+ yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev))
- def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
+ def _decrypt_doc(self, doc_id, rev, content, gen, trans_id,
+ source_replica_uid, workers=True):
"""
Symmetrically decrypt a document.
@@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# save the async result object so we can inspect it for failures
self._async_results.append(self._pool.apply_async(
decrypt_doc_task, args,
- callback=self.decrypt_doc_cb))
+ callback=self._decrypt_doc_cb))
else:
# decrypt inline
res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
+ self._decrypt_doc_cb(res)
- def decrypt_doc_cb(self, result):
+ def _decrypt_doc_cb(self, result):
"""
Store the decryption result in the sync db from where it will later be
- picked by process_decrypted.
+ picked by _process_decrypted.
:param result: A tuple containing the doc id, revision and encrypted
content.
@@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc_id, rev, content, gen, trans_id = result
logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
% (doc_id, rev, gen, trans_id))
- self.insert_received_doc(doc_id, rev, content, gen, trans_id)
+ return self.insert_received_doc(doc_id, rev, content, gen, trans_id)
def get_docs_by_generation(self, encrypted=None):
"""
@@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
sql += " ORDER BY gen ASC"
return self._fetchall(sql)
+ @defer.inlineCallbacks
def get_insertable_docs_by_gen(self):
"""
Return a list of non-encrypted documents ready to be inserted.
@@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# docs, then some document might have been decrypted between these two
# calls, and if it is just the right doc then it might not be caught
# by the next loop.
- all_docs = self.get_docs_by_generation()
- decrypted_docs = self.get_docs_by_generation(encrypted=False)
+ all_docs = yield self.get_docs_by_generation()
+ decrypted_docs = yield self.get_docs_by_generation(encrypted=False)
insertable = []
for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
@@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insertable.append((doc_id, rev, content, gen, trans_id))
else:
break
- return insertable
+ defer.returnValue(insertable)
- def count_docs_in_sync_db(self, encrypted=None):
+ @defer.inlineCallbacks
+ def _count_docs_in_sync_db(self, encrypted=None):
"""
Count how many documents we have in the table for received docs.
@@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:return: The count of documents.
:rtype: int
"""
- if self._sync_db is None:
- logger.warning("cannot return count with null sync_db")
- return
- sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- res = self._fetchall(sql)
+ query += " WHERE encrypted = %d" % int(encrypted)
+ res = yield self._sync_db.runQuery(query)
if res:
val = res.pop()
- return val[0]
+ defer.returnValue(val[0])
else:
- return 0
+ defer.returnValue(0)
- def decrypt_received_docs(self):
+ @defer.inlineCallbacks
+ def _decrypt_received_docs(self):
"""
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
- docs_by_generation = self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ \
- in filter(None, docs_by_generation):
- self.decrypt_doc(
+ self._raise_in_case_of_failed_async_calls()
+ docs_by_generation = yield self.get_docs_by_generation(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _ in docs_by_generation:
+ self._decrypt_doc(
doc_id, rev, content, gen, trans_id, self.source_replica_uid)
- def process_decrypted(self):
+ @defer.inlineCallbacks
+ def _process_decrypted(self):
"""
Process the already decrypted documents, and insert as many documents
as can be taken from the expected order without finding a gap.
@@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# Acquire the lock to avoid processing while we're still
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
- with self.write_encrypted_lock:
- for doc_fields in self.get_insertable_docs_by_gen():
- self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_docs_in_sync_db()
- return remaining == 0
+ insertable = yield self.get_insertable_docs_by_gen()
+ for doc_fields in insertable:
+ yield self.insert_decrypted_local_doc(*doc_fields)
+ @defer.inlineCallbacks
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insert_fun(doc, gen, trans_id)
# If no errors found, remove it from the received database.
- self.delete_received_doc(doc_id, doc_rev)
+ yield self.delete_received_doc(doc_id, doc_rev)
+ @defer.inlineCallbacks
def empty(self):
"""
Empty the received docs table of the sync database.
"""
sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- self._sync_db.execute(sql)
+ yield self._sync_db.runQuery(sql)
+ @defer.inlineCallbacks
def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()
+ results = yield self._sync_db.runQuery(*args, **kwargs)
+ defer.returnValue(results)
- def raise_in_case_of_failed_async_calls(self):
+ def _raise_in_case_of_failed_async_calls(self):
"""
Re-raise any exception raised by an async call.
@@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if not res.successful():
# re-raise the exception raised by the remote call
res.get()
+
+ def _stop_decr_loop(self):
+ """
+ """
+ self._stopped.set()
+
+ def close(self):
+ """
+ """
+ self._stop_decr_loop()
+ SyncEncryptDecryptPool.close(self)
+
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from LoopingCall self._sync_loop.
+ """
+ while not self._stopped.is_set():
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+ self._decrypt_received_docs()
+ self._process_decrypted()
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ def wait(self):
+ while not self.clear_to_sync():
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ @defer.inlineCallbacks
+ def clear_to_sync(self):
+ count = yield self._count_docs_in_sync_db()
+ defer.returnValue(count == 0)
diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py
index 2e9c53a3..55397d10 100644
--- a/client/src/leap/soledad/client/pragmas.py
+++ b/client/src/leap/soledad/client/pragmas.py
@@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database.
"""
import logging
import string
+import threading
+import os
+
+from leap.soledad.common import soledad_assert
+
logger = logging.getLogger(__name__)
+_db_init_lock = threading.Lock()
+
+
+def set_init_pragmas(conn, opts=None, extra_queries=None):
+ """
+ Set the initialization pragmas.
+
+ This includes the crypto pragmas, and any other options that must
+ be passed early to sqlcipher db.
+ """
+ soledad_assert(opts is not None)
+ extra_queries = [] if extra_queries is None else extra_queries
+ with _db_init_lock:
+ # only one execution path should initialize the db
+ _set_init_pragmas(conn, opts, extra_queries)
+
+
+def _set_init_pragmas(conn, opts, extra_queries):
+
+ sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
+ memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
+ nowal = os.environ.get('LEAP_SQLITE_NOWAL')
+
+ set_crypto_pragmas(conn, opts)
+
+ if not nowal:
+ set_write_ahead_logging(conn)
+ if sync_off:
+ set_synchronous_off(conn)
+ else:
+ set_synchronous_normal(conn)
+ if memstore:
+ set_mem_temp_store(conn)
+
+ for query in extra_queries:
+ conn.cursor().execute(query)
+
+
def set_crypto_pragmas(db_handle, sqlcipher_opts):
"""
Set cryptographic params (key, cipher, KDF number of iterations and
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index ec7946b7..4f7ecd1b 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -55,6 +55,7 @@ from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
from httplib import CannotSendRequest
+from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
@@ -63,6 +64,7 @@ from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
from twisted.python import log
+from twisted.enterprise import adbapi
from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
@@ -102,46 +104,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
conn = sqlcipher_dbapi2.connect(
opts.path, check_same_thread=check_same_thread)
- set_init_pragmas(conn, opts, extra_queries=on_init)
+ pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
-_db_init_lock = threading.Lock()
-
-
-def set_init_pragmas(conn, opts=None, extra_queries=None):
- """
- Set the initialization pragmas.
-
- This includes the crypto pragmas, and any other options that must
- be passed early to sqlcipher db.
- """
- soledad_assert(opts is not None)
- extra_queries = [] if extra_queries is None else extra_queries
- with _db_init_lock:
- # only one execution path should initialize the db
- _set_init_pragmas(conn, opts, extra_queries)
-
-
-def _set_init_pragmas(conn, opts, extra_queries):
-
- sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
- memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
- nowal = os.environ.get('LEAP_SQLITE_NOWAL')
-
- pragmas.set_crypto_pragmas(conn, opts)
-
- if not nowal:
- pragmas.set_write_ahead_logging(conn)
- if sync_off:
- pragmas.set_synchronous_off(conn)
- else:
- pragmas.set_synchronous_normal(conn)
- if memstore:
- pragmas.set_mem_temp_store(conn)
-
- for query in extra_queries:
- conn.cursor().execute(query)
+def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
+ from leap.soledad.client import sqlcipher_adbapi
+ return sqlcipher_adbapi.getConnectionPool(
+ opts, extra_queries=extra_queries)
class SQLCipherOptions(object):
@@ -151,22 +121,32 @@ class SQLCipherOptions(object):
@classmethod
def copy(cls, source, path=None, key=None, create=None,
- is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None,
- defer_encryption=None, sync_db_key=None):
+ is_raw_key=None, cipher=None, kdf_iter=None,
+ cipher_page_size=None, defer_encryption=None, sync_db_key=None):
"""
Return a copy of C{source} with parameters different than None
replaced by new values.
"""
- return SQLCipherOptions(
- path if path else source.path,
- key if key else source.key,
- create=create if create else source.create,
- is_raw_key=is_raw_key if is_raw_key else source.is_raw_key,
- cipher=cipher if cipher else source.cipher,
- kdf_iter=kdf_iter if kdf_iter else source.kdf_iter,
- cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size,
- defer_encryption=defer_encryption if defer_encryption else source.defer_encryption,
- sync_db_key=sync_db_key if sync_db_key else source.sync_db_key)
+ local_vars = locals()
+ args = []
+ kwargs = {}
+
+ for name in ["path", "key"]:
+ val = local_vars[name]
+ if val is not None:
+ args.append(val)
+ else:
+ args.append(getattr(source, name))
+
+ for name in ["create", "is_raw_key", "cipher", "kdf_iter",
+ "cipher_page_size", "defer_encryption", "sync_db_key"]:
+ val = local_vars[name]
+ if val is not None:
+ kwargs[name] = val
+ else:
+ kwargs[name] = getattr(source, name)
+
+ return SQLCipherOptions(*args, **kwargs)
def __init__(self, path, key, create=True, is_raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
@@ -478,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._sync_db_key = opts.sync_db_key
self._sync_db = None
- self._sync_db_write_lock = None
self._sync_enc_pool = None
self.sync_queue = None
@@ -490,7 +469,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self._sync_db_write_lock = threading.Lock()
self.sync_queue = multiprocessing.Queue()
self.running = False
@@ -512,7 +490,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# initialize syncing queue encryption pool
self._sync_enc_pool = crypto.SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
+ self._crypto, self._sync_db)
# -----------------------------------------------------------------
# From the documentation: If f returns a deferred, rescheduling
@@ -588,11 +566,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# somewhere else
sync_opts = SQLCipherOptions.copy(
opts, path=sync_db_path, create=True)
- self._sync_db = initialize_sqlcipher_db(
- sync_opts, on_init=self._sync_db_extra_init,
- check_same_thread=False)
- pragmas.set_crypto_pragmas(self._sync_db, opts)
- # ---------------------------------------------------------
+ self._sync_db = getConnectionPool(
+ sync_opts, extra_queries=self._sync_db_extra_init)
@property
def _sync_db_extra_init(self):
@@ -727,7 +702,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- wlock = self._sync_db_write_lock
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
@@ -735,8 +709,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._replica_uid,
creds=creds,
crypto=self._crypto,
- sync_db=self._sync_db,
- sync_db_write_lock=wlock))
+ sync_db=self._sync_db))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -907,3 +880,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
has_conflicts=has_conflicts, syncable=syncable)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
+
+
+#
+# twisted.enterprise.adbapi SQLCipher implementation
+#
+
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+
+def getConnectionPool(opts, extra_queries=None):
+ openfun = partial(
+ pragmas.set_init_pragmas,
+ opts=opts,
+ extra_queries=extra_queries)
+ return SQLCipherConnectionPool(
+ database=opts.path,
+ check_same_thread=False,
+ cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class SQLCipherConnection(adbapi.Connection):
+ pass
+
+
+class SQLCipherTransaction(adbapi.Transaction):
+ pass
+
+
+class SQLCipherConnectionPool(adbapi.ConnectionPool):
+
+ connectionFactory = SQLCipherConnection
+ transactionFactory = SQLCipherTransaction
+
+ def __init__(self, *args, **kwargs):
+ adbapi.ConnectionPool.__init__(
+ self, "pysqlcipher.dbapi2", *args, **kwargs)
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index d59923b2..06cef1ee 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -36,9 +36,8 @@ from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
-from zope.proxy import sameProxiedObjects, setProxiedObject
+from zope.proxy import setProxiedObject
-from twisted.internet.task import LoopingCall
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
@@ -755,17 +754,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# passed to sync_exchange
_insert_doc_cb = defaultdict(lambda: ProxyBase(None))
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
#
# Modified HTTPSyncTarget methods.
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_db_write_lock=None):
+ sync_db=None):
"""
Initialize the SoledadSyncTarget.
@@ -786,9 +780,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
instead of retreiving it from the dedicated
database.
:type sync_db: Sqlite handler
- :param sync_db_write_lock: a write lock for controlling concurrent
- access to the sync_db
- :type sync_db_write_lock: threading.Lock
"""
HTTPSyncTarget.__init__(self, url, creds)
self._raw_url = url
@@ -802,14 +793,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._syncer_pool = None
# deferred decryption attributes
- self._sync_db = None
- self._sync_db_write_lock = None
+ self._sync_db = sync_db
self._decryption_callback = None
self._sync_decr_pool = None
- self._sync_loop = None
- if sync_db and sync_db_write_lock is not None:
- self._sync_db = sync_db
- self._sync_db_write_lock = sync_db_write_lock
def _setup_sync_decr_pool(self):
"""
@@ -818,11 +804,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if self._sync_decr_pool is None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
- self._crypto, self._sync_db,
- self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb)
- self._sync_decr_pool.set_source_replica_uid(
- self.source_replica_uid)
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
def _teardown_sync_decr_pool(self):
"""
@@ -832,23 +817,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool.close()
self._sync_decr_pool = None
- def _setup_sync_loop(self):
- """
- Set up the sync loop for deferred decryption.
- """
- if self._sync_loop is None:
- self._sync_loop = LoopingCall(
- self._decrypt_syncing_received_docs)
- self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
-
- def _teardown_sync_loop(self):
- """
- Tear down the sync loop.
- """
- if self._sync_loop is not None:
- self._sync_loop.stop()
- self._sync_loop = None
-
def _get_replica_uid(self, url):
"""
Return replica uid from the url, or None.
@@ -1138,7 +1106,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
- self._setup_sync_loop()
self._defer_decryption = True
else:
# fall back
@@ -1301,9 +1268,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# decrypt docs in case of deferred decryption
if defer_decryption:
- while not self.clear_to_sync():
- sleep(self.DECRYPT_LOOP_PERIOD)
- self._teardown_sync_loop()
+ self._sync_decr_pool.wait()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
@@ -1324,7 +1289,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
self._stopped = False
-
def stop_syncer(self):
with self._stop_lock:
self._stopped = True
@@ -1449,7 +1413,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: bool
"""
if self._sync_decr_pool:
- return self._sync_decr_pool.count_docs_in_sync_db() == 0
+ return self._sync_decr_pool.clear_to_sync()
return True
def set_decryption_callback(self, cb):
@@ -1474,23 +1438,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
return self._sync_db is not None
- def _decrypt_syncing_received_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- return
-
- decrypter = self._sync_decr_pool
- decrypter.raise_in_case_of_failed_async_calls()
- decrypter.decrypt_received_docs()
- decrypter.process_decrypted()
-
def _sign_request(self, method, url_query, params):
"""
Return an authorization header to be included in the HTTP request.