diff options
-rw-r--r-- | client/changes/feature_use-twisted-adbapi-for-sync-db | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 3 | ||||
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 190 | ||||
-rw-r--r-- | client/src/leap/soledad/client/pragmas.py | 43 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 130 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 71 |
6 files changed, 233 insertions, 205 deletions
diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ + o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject from pysqlcipher.dbapi2 import OperationalError from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): :rtype: U1DBConnectionPool """ if openfun is None and driver == "pysqlcipher": - openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) + openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( "%s.dbapi2" % driver, database=opts.path, check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..dd40b198 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -25,11 +25,15 @@ import json import logging import multiprocessing import threading +import time from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 from zope.proxy import sameProxiedObjects +from twisted.internet import defer +from twisted.internet.threads import deferToThread + from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto @@ -227,7 +231,7 @@ class SoledadCrypto(object): # def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, - mac_method, secret): + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc): def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, - enc_iv, mac_method, secret, doc_mac): + enc_iv, mac_method, secret, doc_mac): """ Verify that C{doc_mac} is a correct MAC for the given document. @@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object): """ WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db, write_lock): + def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object): self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto self._sync_db = sync_db - self._sync_db_write_lock = write_lock def close(self): """ @@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # TODO implement throttling to reduce cpu usage?? WORKERS = multiprocessing.cpu_count() TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" def encrypt_doc(self, doc, workers=True): """ @@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + @defer.inlineCallbacks def insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync @@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # FIXME --- callback should complete immediately since otherwise the # thread which handles the results will get blocked # Right now we're blocking the dispatcher with the writes to sqlite. - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ # TODO implement throttling to reduce cpu usage?? TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" + FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - write_encrypted_lock = threading.Lock() + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 def __init__(self, *args, **kwargs): """ @@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type last_known_generation: int """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.source_replica_uid = None self._async_results = [] - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid + self._stopped = threading.Event() + self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished decryptor thread.")) + @defer.inlineCallbacks def insert_encrypted_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str """ docstr = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, docstr, gen, trans_id, 1)) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - + @defer.inlineCallbacks def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ Insert a document that is not symmetrically encrypted. @@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( - self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, content, gen, trans_id, 0)) + @defer.inlineCallbacks def delete_received_doc(self, doc_id, doc_rev): """ Delete a received doc after it was inserted into the local db. @@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, doc_rev)) + yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - def decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): + def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, + source_replica_uid, workers=True): """ Symmetrically decrypt a document. @@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # save the async result object so we can inspect it for failures self._async_results.append(self._pool.apply_async( decrypt_doc_task, args, - callback=self.decrypt_doc_cb)) + callback=self._decrypt_doc_cb)) else: # decrypt inline res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) + self._decrypt_doc_cb(res) - def decrypt_doc_cb(self, result): + def _decrypt_doc_cb(self, result): """ Store the decryption result in the sync db from where it will later be - picked by process_decrypted. + picked by _process_decrypted. :param result: A tuple containing the doc id, revision and encrypted content. @@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self.insert_received_doc(doc_id, rev, content, gen, trans_id) + return self.insert_received_doc(doc_id, rev, content, gen, trans_id) def get_docs_by_generation(self, encrypted=None): """ @@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): sql += " ORDER BY gen ASC" return self._fetchall(sql) + @defer.inlineCallbacks def get_insertable_docs_by_gen(self): """ Return a list of non-encrypted documents ready to be inserted. @@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # docs, then some document might have been decrypted between these two # calls, and if it is just the right doc then it might not be caught # by the next loop. - all_docs = self.get_docs_by_generation() - decrypted_docs = self.get_docs_by_generation(encrypted=False) + all_docs = yield self.get_docs_by_generation() + decrypted_docs = yield self.get_docs_by_generation(encrypted=False) insertable = [] for doc_id, rev, _, gen, trans_id, encrypted in all_docs: for next_doc_id, _, next_content, _, _, _ in decrypted_docs: @@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insertable.append((doc_id, rev, content, gen, trans_id)) else: break - return insertable + defer.returnValue(insertable) - def count_docs_in_sync_db(self, encrypted=None): + @defer.inlineCallbacks + def _count_docs_in_sync_db(self, encrypted=None): """ Count how many documents we have in the table for received docs. @@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :return: The count of documents. :rtype: int """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - res = self._fetchall(sql) + query += " WHERE encrypted = %d" % int(encrypted) + res = yield self._sync_db.runQuery(query) if res: val = res.pop() - return val[0] + defer.returnValue(val[0]) else: - return 0 + defer.returnValue(0) - def decrypt_received_docs(self): + @defer.inlineCallbacks + def _decrypt_received_docs(self): """ Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. """ - docs_by_generation = self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ \ - in filter(None, docs_by_generation): - self.decrypt_doc( + self._raise_in_case_of_failed_async_calls() + docs_by_generation = yield self.get_docs_by_generation(encrypted=True) + for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: + self._decrypt_doc( doc_id, rev, content, gen, trans_id, self.source_replica_uid) - def process_decrypted(self): + @defer.inlineCallbacks + def _process_decrypted(self): """ Process the already decrypted documents, and insert as many documents as can be taken from the expected order without finding a gap. @@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # Acquire the lock to avoid processing while we're still # getting data from the syncing stream, to avoid InvalidGeneration # problems. - with self.write_encrypted_lock: - for doc_fields in self.get_insertable_docs_by_gen(): - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_docs_in_sync_db() - return remaining == 0 + insertable = yield self.get_insertable_docs_by_gen() + for doc_fields in insertable: + yield self.insert_decrypted_local_doc(*doc_fields) + @defer.inlineCallbacks def insert_decrypted_local_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insert_fun(doc, gen, trans_id) # If no errors found, remove it from the received database. - self.delete_received_doc(doc_id, doc_rev) + yield self.delete_received_doc(doc_id, doc_rev) + @defer.inlineCallbacks def empty(self): """ Empty the received docs table of the sync database. """ sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - self._sync_db.execute(sql) + yield self._sync_db.runQuery(sql) + @defer.inlineCallbacks def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() + results = yield self._sync_db.runQuery(*args, **kwargs) + defer.returnValue(results) - def raise_in_case_of_failed_async_calls(self): + def _raise_in_case_of_failed_async_calls(self): """ Re-raise any exception raised by an async call. @@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if not res.successful(): # re-raise the exception raised by the remote call res.get() + + def _stop_decr_loop(self): + """ + """ + self._stopped.set() + + def close(self): + """ + """ + self._stop_decr_loop() + SyncEncryptDecryptPool.close(self) + + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from LoopingCall self._sync_loop. + """ + while not self._stopped.is_set(): + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + self._decrypt_received_docs() + self._process_decrypted() + time.sleep(self.DECRYPT_LOOP_PERIOD) + + def wait(self): + while not self.clear_to_sync(): + time.sleep(self.DECRYPT_LOOP_PERIOD) + + @defer.inlineCallbacks + def clear_to_sync(self): + count = yield self._count_docs_in_sync_db() + defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database. """ import logging import string +import threading +import os + +from leap.soledad.common import soledad_assert + logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + soledad_assert(opts is not None) + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + set_crypto_pragmas(conn, opts) + + if not nowal: + set_write_ahead_logging(conn) + if sync_off: + set_synchronous_off(conn) + else: + set_synchronous_normal(conn) + if memstore: + set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) + + def set_crypto_pragmas(db_handle, sqlcipher_opts): """ Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index ec7946b7..4f7ecd1b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,6 +55,7 @@ from hashlib import sha256 from contextlib import contextmanager from collections import defaultdict from httplib import CannotSendRequest +from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -63,6 +64,7 @@ from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool from twisted.python import log +from twisted.enterprise import adbapi from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget @@ -102,46 +104,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): conn = sqlcipher_dbapi2.connect( opts.path, check_same_thread=check_same_thread) - set_init_pragmas(conn, opts, extra_queries=on_init) + pragmas.set_init_pragmas(conn, opts, extra_queries=on_init) return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): - """ - Set the initialization pragmas. - - This includes the crypto pragmas, and any other options that must - be passed early to sqlcipher db. - """ - soledad_assert(opts is not None) - extra_queries = [] if extra_queries is None else extra_queries - with _db_init_lock: - # only one execution path should initialize the db - _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - - sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') - memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') - nowal = os.environ.get('LEAP_SQLITE_NOWAL') - - pragmas.set_crypto_pragmas(conn, opts) - - if not nowal: - pragmas.set_write_ahead_logging(conn) - if sync_off: - pragmas.set_synchronous_off(conn) - else: - pragmas.set_synchronous_normal(conn) - if memstore: - pragmas.set_mem_temp_store(conn) - - for query in extra_queries: - conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): + from leap.soledad.client import sqlcipher_adbapi + return sqlcipher_adbapi.getConnectionPool( + opts, extra_queries=extra_queries) class SQLCipherOptions(object): @@ -151,22 +121,32 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, - is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, - defer_encryption=None, sync_db_key=None): + is_raw_key=None, cipher=None, kdf_iter=None, + cipher_page_size=None, defer_encryption=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. """ - return SQLCipherOptions( - path if path else source.path, - key if key else source.key, - create=create if create else source.create, - is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, - cipher=cipher if cipher else source.cipher, - kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, - cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, - defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, - sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) + local_vars = locals() + args = [] + kwargs = {} + + for name in ["path", "key"]: + val = local_vars[name] + if val is not None: + args.append(val) + else: + args.append(getattr(source, name)) + + for name in ["create", "is_raw_key", "cipher", "kdf_iter", + "cipher_page_size", "defer_encryption", "sync_db_key"]: + val = local_vars[name] + if val is not None: + kwargs[name] = val + else: + kwargs[name] = getattr(source, name) + + return SQLCipherOptions(*args, **kwargs) def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -478,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._sync_db_key = opts.sync_db_key self._sync_db = None - self._sync_db_write_lock = None self._sync_enc_pool = None self.sync_queue = None @@ -490,7 +469,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} - self._sync_db_write_lock = threading.Lock() self.sync_queue = multiprocessing.Queue() self.running = False @@ -512,7 +490,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # initialize syncing queue encryption pool self._sync_enc_pool = crypto.SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) + self._crypto, self._sync_db) # ----------------------------------------------------------------- # From the documentation: If f returns a deferred, rescheduling @@ -588,11 +566,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # somewhere else sync_opts = SQLCipherOptions.copy( opts, path=sync_db_path, create=True) - self._sync_db = initialize_sqlcipher_db( - sync_opts, on_init=self._sync_db_extra_init, - check_same_thread=False) - pragmas.set_crypto_pragmas(self._sync_db, opts) - # --------------------------------------------------------- + self._sync_db = getConnectionPool( + sync_opts, extra_queries=self._sync_db_extra_init) @property def _sync_db_extra_init(self): @@ -727,7 +702,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - wlock = self._sync_db_write_lock syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, @@ -735,8 +709,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self._replica_uid, creds=creds, crypto=self._crypto, - sync_db=self._sync_db, - sync_db_write_lock=wlock)) + sync_db=self._sync_db)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -907,3 +880,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, has_conflicts=has_conflicts, syncable=syncable) sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): + openfun = partial( + pragmas.set_init_pragmas, + opts=opts, + extra_queries=extra_queries) + return SQLCipherConnectionPool( + database=opts.path, + check_same_thread=False, + cp_openfun=openfun, + timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): + pass + + +class SQLCipherTransaction(adbapi.Transaction): + pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + + connectionFactory = SQLCipherConnection + transactionFactory = SQLCipherTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__( + self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index d59923b2..06cef1ee 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -36,9 +36,8 @@ from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject +from zope.proxy import setProxiedObject -from twisted.internet.task import LoopingCall from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth @@ -755,17 +754,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # passed to sync_exchange _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - # # Modified HTTPSyncTarget methods. # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None, sync_db_write_lock=None): + sync_db=None): """ Initialize the SoledadSyncTarget. @@ -786,9 +780,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): instead of retreiving it from the dedicated database. :type sync_db: Sqlite handler - :param sync_db_write_lock: a write lock for controlling concurrent - access to the sync_db - :type sync_db_write_lock: threading.Lock """ HTTPSyncTarget.__init__(self, url, creds) self._raw_url = url @@ -802,14 +793,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._syncer_pool = None # deferred decryption attributes - self._sync_db = None - self._sync_db_write_lock = None + self._sync_db = sync_db self._decryption_callback = None self._sync_decr_pool = None - self._sync_loop = None - if sync_db and sync_db_write_lock is not None: - self._sync_db = sync_db - self._sync_db_write_lock = sync_db_write_lock def _setup_sync_decr_pool(self): """ @@ -818,11 +804,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self._sync_decr_pool is None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( - self._crypto, self._sync_db, - self._sync_db_write_lock, - insert_doc_cb=self._insert_doc_cb) - self._sync_decr_pool.set_source_replica_uid( - self.source_replica_uid) + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) def _teardown_sync_decr_pool(self): """ @@ -832,23 +817,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_decr_pool.close() self._sync_decr_pool = None - def _setup_sync_loop(self): - """ - Set up the sync loop for deferred decryption. - """ - if self._sync_loop is None: - self._sync_loop = LoopingCall( - self._decrypt_syncing_received_docs) - self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - - def _teardown_sync_loop(self): - """ - Tear down the sync loop. - """ - if self._sync_loop is not None: - self._sync_loop.stop() - self._sync_loop = None - def _get_replica_uid(self, url): """ Return replica uid from the url, or None. @@ -1138,7 +1106,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption and self._sync_db is not None: self._sync_exchange_lock.acquire() self._setup_sync_decr_pool() - self._setup_sync_loop() self._defer_decryption = True else: # fall back @@ -1301,9 +1268,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # decrypt docs in case of deferred decryption if defer_decryption: - while not self.clear_to_sync(): - sleep(self.DECRYPT_LOOP_PERIOD) - self._teardown_sync_loop() + self._sync_decr_pool.wait() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() @@ -1324,7 +1289,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: self._stopped = False - def stop_syncer(self): with self._stop_lock: self._stopped = True @@ -1449,7 +1413,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: bool """ if self._sync_decr_pool: - return self._sync_decr_pool.count_docs_in_sync_db() == 0 + return self._sync_decr_pool.clear_to_sync() return True def set_decryption_callback(self, cb): @@ -1474,23 +1438,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ return self._sync_db is not None - def _decrypt_syncing_received_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - return - - decrypter = self._sync_decr_pool - decrypter.raise_in_case_of_failed_async_calls() - decrypter.decrypt_received_docs() - decrypter.process_decrypted() - def _sign_request(self, method, url_query, params): """ Return an authorization header to be included in the HTTP request. |