summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/adbapi.py3
-rw-r--r--client/src/leap/soledad/client/crypto.py190
-rw-r--r--client/src/leap/soledad/client/pragmas.py43
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py130
-rw-r--r--client/src/leap/soledad/client/target.py71
5 files changed, 232 insertions, 205 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 7ad10db5..5b882bbe 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject
from pysqlcipher.dbapi2 import OperationalError
from leap.soledad.client import sqlcipher as soledad_sqlcipher
+from leap.soledad.client.pragmas import set_init_pragmas
logger = logging.getLogger(name=__name__)
@@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
:rtype: U1DBConnectionPool
"""
if openfun is None and driver == "pysqlcipher":
- openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts)
+ openfun = partial(set_init_pragmas, opts=opts)
return U1DBConnectionPool(
"%s.dbapi2" % driver, database=opts.path,
check_same_thread=False, cp_openfun=openfun,
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 107bf7f1..dd40b198 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -25,11 +25,15 @@ import json
import logging
import multiprocessing
import threading
+import time
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
from zope.proxy import sameProxiedObjects
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
@@ -227,7 +231,7 @@ class SoledadCrypto(object):
#
def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
- mac_method, secret):
+ mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc):
def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
- enc_iv, mac_method, secret, doc_mac):
+ enc_iv, mac_method, secret, doc_mac):
"""
Verify that C{doc_mac} is a correct MAC for the given document.
@@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object):
"""
WORKERS = multiprocessing.cpu_count()
- def __init__(self, crypto, sync_db, write_lock):
+ def __init__(self, crypto, sync_db):
"""
Initialize the pool of encryption-workers.
@@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object):
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
self._sync_db = sync_db
- self._sync_db_write_lock = write_lock
def close(self):
"""
@@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# TODO implement throttling to reduce cpu usage??
WORKERS = multiprocessing.cpu_count()
TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id, rev, content"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
def encrypt_doc(self, doc, workers=True):
"""
@@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ @defer.inlineCallbacks
def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
@@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# FIXME --- callback should complete immediately since otherwise the
# thread which handles the results will get blocked
# Right now we're blocking the dispatcher with the writes to sqlite.
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
@@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
# TODO implement throttling to reduce cpu usage??
TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted"
+ FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted"
- write_encrypted_lock = threading.Lock()
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
def __init__(self, *args, **kwargs):
"""
@@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.source_replica_uid = None
self._async_results = []
- def set_source_replica_uid(self, source_replica_uid):
- """
- Set the source replica uid for this decrypter pool instance.
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
- """
- self.source_replica_uid = source_replica_uid
+ self._stopped = threading.Event()
+ self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished decryptor thread."))
+ @defer.inlineCallbacks
def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
"""
docstr = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, docstr, gen, trans_id, 1))
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
+ @defer.inlineCallbacks
def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
"""
Insert a document that is not symmetrically encrypted.
@@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
if not isinstance(content, str):
content = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
- self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id,))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, content, gen, trans_id, 0))
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, content, gen, trans_id, 0))
+ @defer.inlineCallbacks
def delete_received_doc(self, doc_id, doc_rev):
"""
Delete a received doc after it was inserted into the local db.
@@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, doc_rev))
+ yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev))
- def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
+ def _decrypt_doc(self, doc_id, rev, content, gen, trans_id,
+ source_replica_uid, workers=True):
"""
Symmetrically decrypt a document.
@@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# save the async result object so we can inspect it for failures
self._async_results.append(self._pool.apply_async(
decrypt_doc_task, args,
- callback=self.decrypt_doc_cb))
+ callback=self._decrypt_doc_cb))
else:
# decrypt inline
res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
+ self._decrypt_doc_cb(res)
- def decrypt_doc_cb(self, result):
+ def _decrypt_doc_cb(self, result):
"""
Store the decryption result in the sync db from where it will later be
- picked by process_decrypted.
+ picked by _process_decrypted.
:param result: A tuple containing the doc id, revision and encrypted
content.
@@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc_id, rev, content, gen, trans_id = result
logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
% (doc_id, rev, gen, trans_id))
- self.insert_received_doc(doc_id, rev, content, gen, trans_id)
+ return self.insert_received_doc(doc_id, rev, content, gen, trans_id)
def get_docs_by_generation(self, encrypted=None):
"""
@@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
sql += " ORDER BY gen ASC"
return self._fetchall(sql)
+ @defer.inlineCallbacks
def get_insertable_docs_by_gen(self):
"""
Return a list of non-encrypted documents ready to be inserted.
@@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# docs, then some document might have been decrypted between these two
# calls, and if it is just the right doc then it might not be caught
# by the next loop.
- all_docs = self.get_docs_by_generation()
- decrypted_docs = self.get_docs_by_generation(encrypted=False)
+ all_docs = yield self.get_docs_by_generation()
+ decrypted_docs = yield self.get_docs_by_generation(encrypted=False)
insertable = []
for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
@@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insertable.append((doc_id, rev, content, gen, trans_id))
else:
break
- return insertable
+ defer.returnValue(insertable)
- def count_docs_in_sync_db(self, encrypted=None):
+ @defer.inlineCallbacks
+ def _count_docs_in_sync_db(self, encrypted=None):
"""
Count how many documents we have in the table for received docs.
@@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:return: The count of documents.
:rtype: int
"""
- if self._sync_db is None:
- logger.warning("cannot return count with null sync_db")
- return
- sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- res = self._fetchall(sql)
+ query += " WHERE encrypted = %d" % int(encrypted)
+ res = yield self._sync_db.runQuery(query)
if res:
val = res.pop()
- return val[0]
+ defer.returnValue(val[0])
else:
- return 0
+ defer.returnValue(0)
- def decrypt_received_docs(self):
+ @defer.inlineCallbacks
+ def _decrypt_received_docs(self):
"""
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
- docs_by_generation = self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ \
- in filter(None, docs_by_generation):
- self.decrypt_doc(
+ self._raise_in_case_of_failed_async_calls()
+ docs_by_generation = yield self.get_docs_by_generation(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _ in docs_by_generation:
+ self._decrypt_doc(
doc_id, rev, content, gen, trans_id, self.source_replica_uid)
- def process_decrypted(self):
+ @defer.inlineCallbacks
+ def _process_decrypted(self):
"""
Process the already decrypted documents, and insert as many documents
as can be taken from the expected order without finding a gap.
@@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# Acquire the lock to avoid processing while we're still
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
- with self.write_encrypted_lock:
- for doc_fields in self.get_insertable_docs_by_gen():
- self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_docs_in_sync_db()
- return remaining == 0
+ insertable = yield self.get_insertable_docs_by_gen()
+ for doc_fields in insertable:
+ yield self.insert_decrypted_local_doc(*doc_fields)
+ @defer.inlineCallbacks
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insert_fun(doc, gen, trans_id)
# If no errors found, remove it from the received database.
- self.delete_received_doc(doc_id, doc_rev)
+ yield self.delete_received_doc(doc_id, doc_rev)
+ @defer.inlineCallbacks
def empty(self):
"""
Empty the received docs table of the sync database.
"""
sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- self._sync_db.execute(sql)
+ yield self._sync_db.runQuery(sql)
+ @defer.inlineCallbacks
def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()
+ results = yield self._sync_db.runQuery(*args, **kwargs)
+ defer.returnValue(results)
- def raise_in_case_of_failed_async_calls(self):
+ def _raise_in_case_of_failed_async_calls(self):
"""
Re-raise any exception raised by an async call.
@@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if not res.successful():
# re-raise the exception raised by the remote call
res.get()
+
+ def _stop_decr_loop(self):
+ """
+ """
+ self._stopped.set()
+
+ def close(self):
+ """
+ """
+ self._stop_decr_loop()
+ SyncEncryptDecryptPool.close(self)
+
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from LoopingCall self._sync_loop.
+ """
+ while not self._stopped.is_set():
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+ self._decrypt_received_docs()
+ self._process_decrypted()
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ def wait(self):
+ while not self.clear_to_sync():
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ @defer.inlineCallbacks
+ def clear_to_sync(self):
+ count = yield self._count_docs_in_sync_db()
+ defer.returnValue(count == 0)
diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py
index 2e9c53a3..55397d10 100644
--- a/client/src/leap/soledad/client/pragmas.py
+++ b/client/src/leap/soledad/client/pragmas.py
@@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database.
"""
import logging
import string
+import threading
+import os
+
+from leap.soledad.common import soledad_assert
+
logger = logging.getLogger(__name__)
+_db_init_lock = threading.Lock()
+
+
+def set_init_pragmas(conn, opts=None, extra_queries=None):
+ """
+ Set the initialization pragmas.
+
+ This includes the crypto pragmas, and any other options that must
+ be passed early to sqlcipher db.
+ """
+ soledad_assert(opts is not None)
+ extra_queries = [] if extra_queries is None else extra_queries
+ with _db_init_lock:
+ # only one execution path should initialize the db
+ _set_init_pragmas(conn, opts, extra_queries)
+
+
+def _set_init_pragmas(conn, opts, extra_queries):
+
+ sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
+ memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
+ nowal = os.environ.get('LEAP_SQLITE_NOWAL')
+
+ set_crypto_pragmas(conn, opts)
+
+ if not nowal:
+ set_write_ahead_logging(conn)
+ if sync_off:
+ set_synchronous_off(conn)
+ else:
+ set_synchronous_normal(conn)
+ if memstore:
+ set_mem_temp_store(conn)
+
+ for query in extra_queries:
+ conn.cursor().execute(query)
+
+
def set_crypto_pragmas(db_handle, sqlcipher_opts):
"""
Set cryptographic params (key, cipher, KDF number of iterations and
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index ec7946b7..4f7ecd1b 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -55,6 +55,7 @@ from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
from httplib import CannotSendRequest
+from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
@@ -63,6 +64,7 @@ from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
from twisted.python import log
+from twisted.enterprise import adbapi
from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
@@ -102,46 +104,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
conn = sqlcipher_dbapi2.connect(
opts.path, check_same_thread=check_same_thread)
- set_init_pragmas(conn, opts, extra_queries=on_init)
+ pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
-_db_init_lock = threading.Lock()
-
-
-def set_init_pragmas(conn, opts=None, extra_queries=None):
- """
- Set the initialization pragmas.
-
- This includes the crypto pragmas, and any other options that must
- be passed early to sqlcipher db.
- """
- soledad_assert(opts is not None)
- extra_queries = [] if extra_queries is None else extra_queries
- with _db_init_lock:
- # only one execution path should initialize the db
- _set_init_pragmas(conn, opts, extra_queries)
-
-
-def _set_init_pragmas(conn, opts, extra_queries):
-
- sync_off = os.environ.get('LEAP_SQLITE_NOSYNC')
- memstore = os.environ.get('LEAP_SQLITE_MEMSTORE')
- nowal = os.environ.get('LEAP_SQLITE_NOWAL')
-
- pragmas.set_crypto_pragmas(conn, opts)
-
- if not nowal:
- pragmas.set_write_ahead_logging(conn)
- if sync_off:
- pragmas.set_synchronous_off(conn)
- else:
- pragmas.set_synchronous_normal(conn)
- if memstore:
- pragmas.set_mem_temp_store(conn)
-
- for query in extra_queries:
- conn.cursor().execute(query)
+def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
+ from leap.soledad.client import sqlcipher_adbapi
+ return sqlcipher_adbapi.getConnectionPool(
+ opts, extra_queries=extra_queries)
class SQLCipherOptions(object):
@@ -151,22 +121,32 @@ class SQLCipherOptions(object):
@classmethod
def copy(cls, source, path=None, key=None, create=None,
- is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None,
- defer_encryption=None, sync_db_key=None):
+ is_raw_key=None, cipher=None, kdf_iter=None,
+ cipher_page_size=None, defer_encryption=None, sync_db_key=None):
"""
Return a copy of C{source} with parameters different than None
replaced by new values.
"""
- return SQLCipherOptions(
- path if path else source.path,
- key if key else source.key,
- create=create if create else source.create,
- is_raw_key=is_raw_key if is_raw_key else source.is_raw_key,
- cipher=cipher if cipher else source.cipher,
- kdf_iter=kdf_iter if kdf_iter else source.kdf_iter,
- cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size,
- defer_encryption=defer_encryption if defer_encryption else source.defer_encryption,
- sync_db_key=sync_db_key if sync_db_key else source.sync_db_key)
+ local_vars = locals()
+ args = []
+ kwargs = {}
+
+ for name in ["path", "key"]:
+ val = local_vars[name]
+ if val is not None:
+ args.append(val)
+ else:
+ args.append(getattr(source, name))
+
+ for name in ["create", "is_raw_key", "cipher", "kdf_iter",
+ "cipher_page_size", "defer_encryption", "sync_db_key"]:
+ val = local_vars[name]
+ if val is not None:
+ kwargs[name] = val
+ else:
+ kwargs[name] = getattr(source, name)
+
+ return SQLCipherOptions(*args, **kwargs)
def __init__(self, path, key, create=True, is_raw_key=False,
cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
@@ -478,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._sync_db_key = opts.sync_db_key
self._sync_db = None
- self._sync_db_write_lock = None
self._sync_enc_pool = None
self.sync_queue = None
@@ -490,7 +469,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
self._syncers = {}
- self._sync_db_write_lock = threading.Lock()
self.sync_queue = multiprocessing.Queue()
self.running = False
@@ -512,7 +490,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# initialize syncing queue encryption pool
self._sync_enc_pool = crypto.SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
+ self._crypto, self._sync_db)
# -----------------------------------------------------------------
# From the documentation: If f returns a deferred, rescheduling
@@ -588,11 +566,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# somewhere else
sync_opts = SQLCipherOptions.copy(
opts, path=sync_db_path, create=True)
- self._sync_db = initialize_sqlcipher_db(
- sync_opts, on_init=self._sync_db_extra_init,
- check_same_thread=False)
- pragmas.set_crypto_pragmas(self._sync_db, opts)
- # ---------------------------------------------------------
+ self._sync_db = getConnectionPool(
+ sync_opts, extra_queries=self._sync_db_extra_init)
@property
def _sync_db_extra_init(self):
@@ -727,7 +702,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
h = sha256(json.dumps([url, creds])).hexdigest()
cur_h, syncer = self._syncers.get(url, (None, None))
if syncer is None or h != cur_h:
- wlock = self._sync_db_write_lock
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
@@ -735,8 +709,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self._replica_uid,
creds=creds,
crypto=self._crypto,
- sync_db=self._sync_db,
- sync_db_write_lock=wlock))
+ sync_db=self._sync_db))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
@@ -907,3 +880,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
has_conflicts=has_conflicts, syncable=syncable)
sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)
+
+
+#
+# twisted.enterprise.adbapi SQLCipher implementation
+#
+
+SQLCIPHER_CONNECTION_TIMEOUT = 10
+
+
+def getConnectionPool(opts, extra_queries=None):
+ openfun = partial(
+ pragmas.set_init_pragmas,
+ opts=opts,
+ extra_queries=extra_queries)
+ return SQLCipherConnectionPool(
+ database=opts.path,
+ check_same_thread=False,
+ cp_openfun=openfun,
+ timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+
+
+class SQLCipherConnection(adbapi.Connection):
+ pass
+
+
+class SQLCipherTransaction(adbapi.Transaction):
+ pass
+
+
+class SQLCipherConnectionPool(adbapi.ConnectionPool):
+
+ connectionFactory = SQLCipherConnection
+ transactionFactory = SQLCipherTransaction
+
+ def __init__(self, *args, **kwargs):
+ adbapi.ConnectionPool.__init__(
+ self, "pysqlcipher.dbapi2", *args, **kwargs)
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index d59923b2..06cef1ee 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -36,9 +36,8 @@ from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
-from zope.proxy import sameProxiedObjects, setProxiedObject
+from zope.proxy import setProxiedObject
-from twisted.internet.task import LoopingCall
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
@@ -755,17 +754,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# passed to sync_exchange
_insert_doc_cb = defaultdict(lambda: ProxyBase(None))
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
#
# Modified HTTPSyncTarget methods.
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_db_write_lock=None):
+ sync_db=None):
"""
Initialize the SoledadSyncTarget.
@@ -786,9 +780,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
instead of retreiving it from the dedicated
database.
:type sync_db: Sqlite handler
- :param sync_db_write_lock: a write lock for controlling concurrent
- access to the sync_db
- :type sync_db_write_lock: threading.Lock
"""
HTTPSyncTarget.__init__(self, url, creds)
self._raw_url = url
@@ -802,14 +793,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._syncer_pool = None
# deferred decryption attributes
- self._sync_db = None
- self._sync_db_write_lock = None
+ self._sync_db = sync_db
self._decryption_callback = None
self._sync_decr_pool = None
- self._sync_loop = None
- if sync_db and sync_db_write_lock is not None:
- self._sync_db = sync_db
- self._sync_db_write_lock = sync_db_write_lock
def _setup_sync_decr_pool(self):
"""
@@ -818,11 +804,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if self._sync_decr_pool is None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
- self._crypto, self._sync_db,
- self._sync_db_write_lock,
- insert_doc_cb=self._insert_doc_cb)
- self._sync_decr_pool.set_source_replica_uid(
- self.source_replica_uid)
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
def _teardown_sync_decr_pool(self):
"""
@@ -832,23 +817,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool.close()
self._sync_decr_pool = None
- def _setup_sync_loop(self):
- """
- Set up the sync loop for deferred decryption.
- """
- if self._sync_loop is None:
- self._sync_loop = LoopingCall(
- self._decrypt_syncing_received_docs)
- self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
-
- def _teardown_sync_loop(self):
- """
- Tear down the sync loop.
- """
- if self._sync_loop is not None:
- self._sync_loop.stop()
- self._sync_loop = None
-
def _get_replica_uid(self, url):
"""
Return replica uid from the url, or None.
@@ -1138,7 +1106,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
- self._setup_sync_loop()
self._defer_decryption = True
else:
# fall back
@@ -1301,9 +1268,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# decrypt docs in case of deferred decryption
if defer_decryption:
- while not self.clear_to_sync():
- sleep(self.DECRYPT_LOOP_PERIOD)
- self._teardown_sync_loop()
+ self._sync_decr_pool.wait()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
@@ -1324,7 +1289,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
self._stopped = False
-
def stop_syncer(self):
with self._stop_lock:
self._stopped = True
@@ -1449,7 +1413,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: bool
"""
if self._sync_decr_pool:
- return self._sync_decr_pool.count_docs_in_sync_db() == 0
+ return self._sync_decr_pool.clear_to_sync()
return True
def set_decryption_callback(self, cb):
@@ -1474,23 +1438,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
return self._sync_db is not None
- def _decrypt_syncing_received_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- return
-
- decrypter = self._sync_decr_pool
- decrypter.raise_in_case_of_failed_async_calls()
- decrypter.decrypt_received_docs()
- decrypter.process_decrypted()
-
def _sign_request(self, method, url_query, params):
"""
Return an authorization header to be included in the HTTP request.