diff options
Diffstat (limited to 'client')
| -rw-r--r-- | client/changes/feature_use-twisted-adbapi-for-sync-db | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 3 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 190 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/pragmas.py | 43 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 130 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 71 | 
6 files changed, 233 insertions, 205 deletions
| diff --git a/client/changes/feature_use-twisted-adbapi-for-sync-db b/client/changes/feature_use-twisted-adbapi-for-sync-db new file mode 100644 index 00000000..41e5e6e3 --- /dev/null +++ b/client/changes/feature_use-twisted-adbapi-for-sync-db @@ -0,0 +1 @@ +  o Use twisted.enterprise.adbapi for access to the sync database. diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 7ad10db5..5b882bbe 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -32,6 +32,7 @@ from zope.proxy import ProxyBase, setProxiedObject  from pysqlcipher.dbapi2 import OperationalError  from leap.soledad.client import sqlcipher as soledad_sqlcipher +from leap.soledad.client.pragmas import set_init_pragmas  logger = logging.getLogger(name=__name__) @@ -72,7 +73,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):      :rtype: U1DBConnectionPool      """      if openfun is None and driver == "pysqlcipher": -        openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) +        openfun = partial(set_init_pragmas, opts=opts)      return U1DBConnectionPool(          "%s.dbapi2" % driver, database=opts.path,          check_same_thread=False, cp_openfun=openfun, diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..dd40b198 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -25,11 +25,15 @@ import json  import logging  import multiprocessing  import threading +import time  from pycryptopp.cipher.aes import AES  from pycryptopp.cipher.xsalsa20 import XSalsa20  from zope.proxy import sameProxiedObjects +from twisted.internet import defer +from twisted.internet.threads import deferToThread +  from leap.soledad.common import soledad_assert  from leap.soledad.common import soledad_assert_type  from leap.soledad.common import crypto @@ -227,7 +231,7 @@ class SoledadCrypto(object):  #  def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, -        mac_method, secret): +            mac_method, secret):      """      Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc):  def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, -        enc_iv, mac_method, secret, doc_mac): +                    enc_iv, mac_method, secret, doc_mac):      """      Verify that C{doc_mac} is a correct MAC for the given document. @@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object):      """      WORKERS = multiprocessing.cpu_count() -    def __init__(self, crypto, sync_db, write_lock): +    def __init__(self, crypto, sync_db):          """          Initialize the pool of encryption-workers. @@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object):          self._pool = multiprocessing.Pool(self.WORKERS)          self._crypto = crypto          self._sync_db = sync_db -        self._sync_db_write_lock = write_lock      def close(self):          """ @@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):      # TODO implement throttling to reduce cpu usage??      WORKERS = multiprocessing.cpu_count()      TABLE_NAME = "docs_tosync" -    FIELD_NAMES = "doc_id, rev, content" +    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"      def encrypt_doc(self, doc, workers=True):          """ @@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          :type result: tuple(str, str, str)          """          doc_id, doc_rev, content = result -        self.insert_encrypted_local_doc(doc_id, doc_rev, content) +        return self.insert_encrypted_local_doc(doc_id, doc_rev, content) +    @defer.inlineCallbacks      def insert_encrypted_local_doc(self, doc_id, doc_rev, content):          """          Insert the contents of the encrypted doc into the local sync @@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          # FIXME --- callback should complete immediately since otherwise the          # thread which handles the results will get blocked          # Right now we're blocking the dispatcher with the writes to sqlite. -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, )) -            con.execute(sql_ins, (doc_id, doc_rev, content)) +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ +                % (self.TABLE_NAME,) +        yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))  def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      """      # TODO implement throttling to reduce cpu usage??      TABLE_NAME = "docs_received" -    FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" +    FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" -    write_encrypted_lock = threading.Lock() +    """ +    Period of recurrence of the periodic decrypting task, in seconds. +    """ +    DECRYPT_LOOP_PERIOD = 0.5      def __init__(self, *args, **kwargs):          """ @@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type last_known_generation: int          """          self._insert_doc_cb = kwargs.pop("insert_doc_cb") +        self.source_replica_uid = kwargs.pop("source_replica_uid")          SyncEncryptDecryptPool.__init__(self, *args, **kwargs) -        self.source_replica_uid = None          self._async_results = [] -    def set_source_replica_uid(self, source_replica_uid): -        """ -        Set the source replica uid for this decrypter pool instance. - -        :param source_replica_uid: The uid of the source replica. -        :type source_replica_uid: str -        """ -        self.source_replica_uid = source_replica_uid +        self._stopped = threading.Event() +        self._deferred_loop = deferToThread(self._decrypt_and_process_docs) +        self._deferred_loop.addCallback( +            lambda _: logger.debug("Finished decryptor thread.")) +    @defer.inlineCallbacks      def insert_encrypted_received_doc(self, doc_id, doc_rev, content,                                        gen, trans_id):          """ @@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type trans_id: str          """          docstr = json.dumps(content) -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (              self.TABLE_NAME,) +        yield self._sync_db.runQuery( +            query, +            (doc_id, doc_rev, docstr, gen, trans_id, 1)) -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, )) -            con.execute( -                sql_ins, -                (doc_id, doc_rev, docstr, gen, trans_id, 1)) - +    @defer.inlineCallbacks      def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):          """          Insert a document that is not symmetrically encrypted. @@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          if not isinstance(content, str):              content = json.dumps(content) -        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( -            self.TABLE_NAME,) -        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( +        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (              self.TABLE_NAME,) -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id,)) -            con.execute( -                sql_ins, -                (doc_id, doc_rev, content, gen, trans_id, 0)) +        yield self._sync_db.runQuery( +            query, +            (doc_id, doc_rev, content, gen, trans_id, 0)) +    @defer.inlineCallbacks      def delete_received_doc(self, doc_id, doc_rev):          """          Delete a received doc after it was inserted into the local db. @@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (              self.TABLE_NAME,) -        con = self._sync_db -        with self._sync_db_write_lock: -            con.execute(sql_del, (doc_id, doc_rev)) +        yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) -    def decrypt_doc(self, doc_id, rev, content, gen, trans_id, -                    source_replica_uid, workers=True): +    def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, +                     source_replica_uid, workers=True):          """          Symmetrically decrypt a document. @@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              # save the async result object so we can inspect it for failures              self._async_results.append(self._pool.apply_async(                  decrypt_doc_task, args, -                callback=self.decrypt_doc_cb)) +                callback=self._decrypt_doc_cb))          else:              # decrypt inline              res = decrypt_doc_task(*args) -            self.decrypt_doc_cb(res) +            self._decrypt_doc_cb(res) -    def decrypt_doc_cb(self, result): +    def _decrypt_doc_cb(self, result):          """          Store the decryption result in the sync db from where it will later be -        picked by process_decrypted. +        picked by _process_decrypted.          :param result: A tuple containing the doc id, revision and encrypted          content. @@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          doc_id, rev, content, gen, trans_id = result          logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"                       % (doc_id, rev, gen, trans_id)) -        self.insert_received_doc(doc_id, rev, content, gen, trans_id) +        return self.insert_received_doc(doc_id, rev, content, gen, trans_id)      def get_docs_by_generation(self, encrypted=None):          """ @@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          sql += " ORDER BY gen ASC"          return self._fetchall(sql) +    @defer.inlineCallbacks      def get_insertable_docs_by_gen(self):          """          Return a list of non-encrypted documents ready to be inserted. @@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          # docs, then some document might have been decrypted between these two          # calls, and if it is just the right doc then it might not be caught          # by the next loop. -        all_docs = self.get_docs_by_generation() -        decrypted_docs = self.get_docs_by_generation(encrypted=False) +        all_docs = yield self.get_docs_by_generation() +        decrypted_docs = yield self.get_docs_by_generation(encrypted=False)          insertable = []          for doc_id, rev, _, gen, trans_id, encrypted in all_docs:              for next_doc_id, _, next_content, _, _, _ in decrypted_docs: @@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                      insertable.append((doc_id, rev, content, gen, trans_id))                  else:                      break -        return insertable +        defer.returnValue(insertable) -    def count_docs_in_sync_db(self, encrypted=None): +    @defer.inlineCallbacks +    def _count_docs_in_sync_db(self, encrypted=None):          """          Count how many documents we have in the table for received docs. @@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :return: The count of documents.          :rtype: int          """ -        if self._sync_db is None: -            logger.warning("cannot return count with null sync_db") -            return -        sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) +        query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)          if encrypted is not None: -            sql += " WHERE encrypted = %d" % int(encrypted) -        res = self._fetchall(sql) +            query += " WHERE encrypted = %d" % int(encrypted) +        res = yield self._sync_db.runQuery(query)          if res:              val = res.pop() -            return val[0] +            defer.returnValue(val[0])          else: -            return 0 +            defer.returnValue(0) -    def decrypt_received_docs(self): +    @defer.inlineCallbacks +    def _decrypt_received_docs(self):          """          Get all the encrypted documents from the sync database and dispatch a          decrypt worker to decrypt each one of them.          """ -        docs_by_generation = self.get_docs_by_generation(encrypted=True) -        for doc_id, rev, content, gen, trans_id, _ \ -                in filter(None, docs_by_generation): -            self.decrypt_doc( +        self._raise_in_case_of_failed_async_calls() +        docs_by_generation = yield self.get_docs_by_generation(encrypted=True) +        for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: +            self._decrypt_doc(                  doc_id, rev, content, gen, trans_id, self.source_replica_uid) -    def process_decrypted(self): +    @defer.inlineCallbacks +    def _process_decrypted(self):          """          Process the already decrypted documents, and insert as many documents          as can be taken from the expected order without finding a gap. @@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          # Acquire the lock to avoid processing while we're still          # getting data from the syncing stream, to avoid InvalidGeneration          # problems. -        with self.write_encrypted_lock: -            for doc_fields in self.get_insertable_docs_by_gen(): -                self.insert_decrypted_local_doc(*doc_fields) -        remaining = self.count_docs_in_sync_db() -        return remaining == 0 +        insertable = yield self.get_insertable_docs_by_gen() +        for doc_fields in insertable: +            yield self.insert_decrypted_local_doc(*doc_fields) +    @defer.inlineCallbacks      def insert_decrypted_local_doc(self, doc_id, doc_rev, content,                                     gen, trans_id):          """ @@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          insert_fun(doc, gen, trans_id)          # If no errors found, remove it from the received database. -        self.delete_received_doc(doc_id, doc_rev) +        yield self.delete_received_doc(doc_id, doc_rev) +    @defer.inlineCallbacks      def empty(self):          """          Empty the received docs table of the sync database.          """          sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) -        self._sync_db.execute(sql) +        yield self._sync_db.runQuery(sql) +    @defer.inlineCallbacks      def _fetchall(self, *args, **kwargs): -        with self._sync_db: -            c = self._sync_db.cursor() -            c.execute(*args, **kwargs) -            return c.fetchall() +        results = yield self._sync_db.runQuery(*args, **kwargs) +        defer.returnValue(results) -    def raise_in_case_of_failed_async_calls(self): +    def _raise_in_case_of_failed_async_calls(self):          """          Re-raise any exception raised by an async call. @@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                  if not res.successful():                      # re-raise the exception raised by the remote call                      res.get() + +    def _stop_decr_loop(self): +        """ +        """ +        self._stopped.set() + +    def close(self): +        """ +        """ +        self._stop_decr_loop() +        SyncEncryptDecryptPool.close(self) + +    def _decrypt_and_process_docs(self): +        """ +        Decrypt the documents received from remote replica and insert them +        into the local one. + +        Called periodically from LoopingCall self._sync_loop. +        """ +        while not self._stopped.is_set(): +            if sameProxiedObjects( +                    self._insert_doc_cb.get(self.source_replica_uid), +                    None): +                continue +            self._decrypt_received_docs() +            self._process_decrypted() +            time.sleep(self.DECRYPT_LOOP_PERIOD) + +    def wait(self): +        while not self.clear_to_sync(): +            time.sleep(self.DECRYPT_LOOP_PERIOD) + +    @defer.inlineCallbacks +    def clear_to_sync(self): +        count = yield self._count_docs_in_sync_db() +        defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 2e9c53a3..55397d10 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -19,10 +19,53 @@ Different pragmas used in the initialization of the SQLCipher database.  """  import logging  import string +import threading +import os + +from leap.soledad.common import soledad_assert +  logger = logging.getLogger(__name__) +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): +    """ +    Set the initialization pragmas. + +    This includes the crypto pragmas, and any other options that must +    be passed early to sqlcipher db. +    """ +    soledad_assert(opts is not None) +    extra_queries = [] if extra_queries is None else extra_queries +    with _db_init_lock: +        # only one execution path should initialize the db +        _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + +    sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') +    memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') +    nowal = os.environ.get('LEAP_SQLITE_NOWAL') + +    set_crypto_pragmas(conn, opts) + +    if not nowal: +        set_write_ahead_logging(conn) +    if sync_off: +        set_synchronous_off(conn) +    else: +        set_synchronous_normal(conn) +    if memstore: +        set_mem_temp_store(conn) + +    for query in extra_queries: +        conn.cursor().execute(query) + +  def set_crypto_pragmas(db_handle, sqlcipher_opts):      """      Set cryptographic params (key, cipher, KDF number of iterations and diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index ec7946b7..4f7ecd1b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,6 +55,7 @@ from hashlib import sha256  from contextlib import contextmanager  from collections import defaultdict  from httplib import CannotSendRequest +from functools import partial  from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -63,6 +64,7 @@ from twisted.internet.task import LoopingCall  from twisted.internet.threads import deferToThreadPool  from twisted.python.threadpool import ThreadPool  from twisted.python import log +from twisted.enterprise import adbapi  from leap.soledad.client import crypto  from leap.soledad.client.target import SoledadSyncTarget @@ -102,46 +104,14 @@ def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):      conn = sqlcipher_dbapi2.connect(          opts.path, check_same_thread=check_same_thread) -    set_init_pragmas(conn, opts, extra_queries=on_init) +    pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)      return conn -_db_init_lock = threading.Lock() - - -def set_init_pragmas(conn, opts=None, extra_queries=None): -    """ -    Set the initialization pragmas. - -    This includes the crypto pragmas, and any other options that must -    be passed early to sqlcipher db. -    """ -    soledad_assert(opts is not None) -    extra_queries = [] if extra_queries is None else extra_queries -    with _db_init_lock: -        # only one execution path should initialize the db -        _set_init_pragmas(conn, opts, extra_queries) - - -def _set_init_pragmas(conn, opts, extra_queries): - -    sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') -    memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') -    nowal = os.environ.get('LEAP_SQLITE_NOWAL') - -    pragmas.set_crypto_pragmas(conn, opts) - -    if not nowal: -        pragmas.set_write_ahead_logging(conn) -    if sync_off: -        pragmas.set_synchronous_off(conn) -    else: -        pragmas.set_synchronous_normal(conn) -    if memstore: -        pragmas.set_mem_temp_store(conn) - -    for query in extra_queries: -        conn.cursor().execute(query) +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): +    from leap.soledad.client import sqlcipher_adbapi +    return sqlcipher_adbapi.getConnectionPool( +        opts, extra_queries=extra_queries)  class SQLCipherOptions(object): @@ -151,22 +121,32 @@ class SQLCipherOptions(object):      @classmethod      def copy(cls, source, path=None, key=None, create=None, -            is_raw_key=None, cipher=None, kdf_iter=None, cipher_page_size=None, -            defer_encryption=None, sync_db_key=None): +             is_raw_key=None, cipher=None, kdf_iter=None, +             cipher_page_size=None, defer_encryption=None, sync_db_key=None):          """          Return a copy of C{source} with parameters different than None          replaced by new values.          """ -        return SQLCipherOptions( -            path if path else source.path, -            key if key else source.key, -            create=create if create else source.create, -            is_raw_key=is_raw_key if is_raw_key else source.is_raw_key, -            cipher=cipher if cipher else source.cipher, -            kdf_iter=kdf_iter if kdf_iter else source.kdf_iter, -            cipher_page_size=cipher_page_size if cipher_page_size else source.cipher_page_size, -            defer_encryption=defer_encryption if defer_encryption else source.defer_encryption, -            sync_db_key=sync_db_key if sync_db_key else source.sync_db_key) +        local_vars = locals() +        args = [] +        kwargs = {} + +        for name in ["path", "key"]: +            val = local_vars[name] +            if val is not None: +                args.append(val) +            else: +                args.append(getattr(source, name)) + +        for name in ["create", "is_raw_key", "cipher", "kdf_iter", +                     "cipher_page_size", "defer_encryption", "sync_db_key"]: +            val = local_vars[name] +            if val is not None: +                kwargs[name] = val +            else: +                kwargs[name] = getattr(source, name) + +        return SQLCipherOptions(*args, **kwargs)      def __init__(self, path, key, create=True, is_raw_key=False,                   cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, @@ -478,7 +458,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          self._sync_db_key = opts.sync_db_key          self._sync_db = None -        self._sync_db_write_lock = None          self._sync_enc_pool = None          self.sync_queue = None @@ -490,7 +469,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          #  self._syncers = {'<url>': ('<auth_hash>', syncer), ...}          self._syncers = {} -        self._sync_db_write_lock = threading.Lock()          self.sync_queue = multiprocessing.Queue()          self.running = False @@ -512,7 +490,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              # initialize syncing queue encryption pool              self._sync_enc_pool = crypto.SyncEncrypterPool( -                self._crypto, self._sync_db, self._sync_db_write_lock) +                self._crypto, self._sync_db)              # -----------------------------------------------------------------              # From the documentation: If f returns a deferred, rescheduling @@ -588,11 +566,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          # somewhere else          sync_opts = SQLCipherOptions.copy(              opts, path=sync_db_path, create=True) -        self._sync_db = initialize_sqlcipher_db( -            sync_opts, on_init=self._sync_db_extra_init, -            check_same_thread=False) -        pragmas.set_crypto_pragmas(self._sync_db, opts) -        # --------------------------------------------------------- +        self._sync_db = getConnectionPool( +            sync_opts, extra_queries=self._sync_db_extra_init)      @property      def _sync_db_extra_init(self): @@ -727,7 +702,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          h = sha256(json.dumps([url, creds])).hexdigest()          cur_h, syncer = self._syncers.get(url, (None, None))          if syncer is None or h != cur_h: -            wlock = self._sync_db_write_lock              syncer = SoledadSynchronizer(                  self,                  SoledadSyncTarget(url, @@ -735,8 +709,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):                                    self._replica_uid,                                    creds=creds,                                    crypto=self._crypto, -                                  sync_db=self._sync_db, -                                  sync_db_write_lock=wlock)) +                                  sync_db=self._sync_db))              self._syncers[url] = (h, syncer)          # in order to reuse the same synchronizer multiple times we have to          # reset its state (i.e. the number of documents received from target @@ -907,3 +880,40 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,                             has_conflicts=has_conflicts, syncable=syncable)  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): +    openfun = partial( +        pragmas.set_init_pragmas, +        opts=opts, +        extra_queries=extra_queries) +    return SQLCipherConnectionPool( +        database=opts.path, +        check_same_thread=False, +        cp_openfun=openfun, +        timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): +    pass + + +class SQLCipherTransaction(adbapi.Transaction): +    pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + +    connectionFactory = SQLCipherConnection +    transactionFactory = SQLCipherTransaction + +    def __init__(self, *args, **kwargs): +        adbapi.ConnectionPool.__init__( +            self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index d59923b2..06cef1ee 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -36,9 +36,8 @@ from u1db.remote import utils, http_errors  from u1db.remote.http_target import HTTPSyncTarget  from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase  from zope.proxy import ProxyBase -from zope.proxy import sameProxiedObjects, setProxiedObject +from zope.proxy import setProxiedObject -from twisted.internet.task import LoopingCall  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.auth import TokenBasedAuth @@ -755,17 +754,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):      # passed to sync_exchange      _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) -    """ -    Period of recurrence of the periodic decrypting task, in seconds. -    """ -    DECRYPT_LOOP_PERIOD = 0.5 -      #      # Modified HTTPSyncTarget methods.      #      def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, -                 sync_db=None, sync_db_write_lock=None): +                 sync_db=None):          """          Initialize the SoledadSyncTarget. @@ -786,9 +780,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                          instead of retreiving it from the dedicated                          database.          :type sync_db: Sqlite handler -        :param sync_db_write_lock: a write lock for controlling concurrent -                                   access to the sync_db -        :type sync_db_write_lock: threading.Lock          """          HTTPSyncTarget.__init__(self, url, creds)          self._raw_url = url @@ -802,14 +793,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          self._syncer_pool = None          # deferred decryption attributes -        self._sync_db = None -        self._sync_db_write_lock = None +        self._sync_db = sync_db          self._decryption_callback = None          self._sync_decr_pool = None -        self._sync_loop = None -        if sync_db and sync_db_write_lock is not None: -            self._sync_db = sync_db -            self._sync_db_write_lock = sync_db_write_lock      def _setup_sync_decr_pool(self):          """ @@ -818,11 +804,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          if self._sync_decr_pool is None:              # initialize syncing queue decryption pool              self._sync_decr_pool = SyncDecrypterPool( -                self._crypto, self._sync_db, -                self._sync_db_write_lock, -                insert_doc_cb=self._insert_doc_cb) -            self._sync_decr_pool.set_source_replica_uid( -                self.source_replica_uid) +                self._crypto, +                self._sync_db, +                insert_doc_cb=self._insert_doc_cb, +                source_replica_uid=self.source_replica_uid)      def _teardown_sync_decr_pool(self):          """ @@ -832,23 +817,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):              self._sync_decr_pool.close()              self._sync_decr_pool = None -    def _setup_sync_loop(self): -        """ -        Set up the sync loop for deferred decryption. -        """ -        if self._sync_loop is None: -            self._sync_loop = LoopingCall( -                self._decrypt_syncing_received_docs) -            self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - -    def _teardown_sync_loop(self): -        """ -        Tear down the sync loop. -        """ -        if self._sync_loop is not None: -            self._sync_loop.stop() -            self._sync_loop = None -      def _get_replica_uid(self, url):          """          Return replica uid from the url, or None. @@ -1138,7 +1106,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          if defer_decryption and self._sync_db is not None:              self._sync_exchange_lock.acquire()              self._setup_sync_decr_pool() -            self._setup_sync_loop()              self._defer_decryption = True          else:              # fall back @@ -1301,9 +1268,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          # decrypt docs in case of deferred decryption          if defer_decryption: -            while not self.clear_to_sync(): -                sleep(self.DECRYPT_LOOP_PERIOD) -            self._teardown_sync_loop() +            self._sync_decr_pool.wait()              self._teardown_sync_decr_pool()              self._sync_exchange_lock.release() @@ -1324,7 +1289,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          with self._stop_lock:              self._stopped = False -      def stop_syncer(self):          with self._stop_lock:              self._stopped = True @@ -1449,7 +1413,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :rtype: bool          """          if self._sync_decr_pool: -            return self._sync_decr_pool.count_docs_in_sync_db() == 0 +            return self._sync_decr_pool.clear_to_sync()          return True      def set_decryption_callback(self, cb): @@ -1474,23 +1438,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          return self._sync_db is not None -    def _decrypt_syncing_received_docs(self): -        """ -        Decrypt the documents received from remote replica and insert them -        into the local one. - -        Called periodically from LoopingCall self._sync_loop. -        """ -        if sameProxiedObjects( -                self._insert_doc_cb.get(self.source_replica_uid), -                None): -            return - -        decrypter = self._sync_decr_pool -        decrypter.raise_in_case_of_failed_async_calls() -        decrypter.decrypt_received_docs() -        decrypter.process_decrypted() -      def _sign_request(self, method, url_query, params):          """          Return an authorization header to be included in the HTTP request. | 
