diff options
| -rw-r--r-- | client/src/leap/soledad/client/api.py | 31 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 380 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 12 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 66 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/interfaces.py | 7 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 11 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 18 | ||||
| -rw-r--r-- | testing/tests/perf/test_encdecpool.py | 41 | ||||
| -rw-r--r-- | testing/tests/perf/test_sync.py | 3 | ||||
| -rw-r--r-- | testing/tests/sync/test_encdecpool.py | 258 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync.py | 2 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync_deferred.py | 15 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync_mutex.py | 4 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync_target.py | 27 | 
14 files changed, 36 insertions, 839 deletions
| diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..cbcae4f7 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -169,7 +169,7 @@ class Soledad(object):          :type auth_token: str          :param defer_encryption: -            Whether to defer encryption/decryption of documents, or do it +            Whether to defer encryption of documents, or do it              inline while syncing.          :type defer_encryption: bool @@ -299,9 +299,9 @@ class Soledad(object):          )          self._sqlcipher_opts = opts -        # the sync_db is used both for deferred encryption and decryption, so +        # the sync_db is used both for deferred encryption, so          # we want to initialize it anyway to allow for all combinations of -        # deferred encryption and decryption configurations. +        # deferred encryption configurations.          self._initialize_sync_db(opts)          self._dbpool = adbapi.getConnectionPool(              opts, sync_enc_pool=self._sync_enc_pool) @@ -700,37 +700,26 @@ class Soledad(object):          if syncable and not self._dbsyncer:              self._init_u1db_syncer() -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize documents with the server replica.          This method uses a lock to prevent multiple concurrent sync processes          over the same local db file. -        :param defer_decryption: -            Whether to defer decryption of documents, or do it inline while -            syncing. -        :type defer_decryption: bool -          :return: A deferred lock that will run the actual sync process when                   the lock is acquired, and which will fire with with the local                   generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred          """          d = self.sync_lock.run( -            self._sync, -            defer_decryption) +            self._sync)          return d -    def _sync(self, defer_decryption): +    def _sync(self):          """          Synchronize documents with the server replica. -        :param defer_decryption: -            Whether to defer decryption of documents, or do it inline while -            syncing. -        :type defer_decryption: bool -          :return: A deferred whose callback will be invoked with the local              generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred @@ -740,8 +729,7 @@ class Soledad(object):              return          d = self._dbsyncer.sync(              sync_url, -            creds=self._creds, -            defer_decryption=defer_decryption) +            creds=self._creds)          def _sync_callback(local_gen):              self._last_received_docs = docs = self._dbsyncer.received_docs @@ -874,12 +862,9 @@ class Soledad(object):          """          maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"          encr = encdecpool.SyncEncrypterPool -        decr = encdecpool.SyncDecrypterPool          sql_encr_table_query = (maybe_create % (              encr.TABLE_NAME, encr.FIELD_NAMES)) -        sql_decr_table_query = (maybe_create % ( -            decr.TABLE_NAME, decr.FIELD_NAMES)) -        return (sql_encr_table_query, sql_decr_table_query) +        return (sql_encr_table_query,)      #      # ISecretsStorage diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 056b012f..8eaefa77 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,14 +22,9 @@ during synchronization.  """ -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall  from twisted.internet import threads  from twisted.internet import defer -from leap.soledad.common.document import SoledadDocument  from leap.soledad.common import soledad_assert  from leap.soledad.common.log import getLogger @@ -41,7 +36,7 @@ logger = getLogger(__name__)  # -# Encrypt/decrypt pools of workers +# Encrypt pool of workers  #  class SyncEncryptDecryptPool(object): @@ -282,376 +277,3 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,      """      decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)      return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric decryption -    of documents that were received. - -    The decryption of the received documents is done in two steps: - -        1. Encrypted documents are stored in the sync db by the actual soledad -           sync loop. -        2. The soledad sync loop tells us how many documents we should expect -           to process. -        3. We start a decrypt-and-process loop: - -            a. Encrypted documents are fetched. -            b. Encrypted documents are decrypted. -            c. The longest possible list of decrypted documents are inserted -               in the soledad db (this depends on which documents have already -               arrived and which documents have already been decrypte, because -               the order of insertion in the local soledad db matters). -            d. Processed documents are deleted from the database. - -        4. When we have processed as many documents as we should, the loop -           finishes. -    """ -    TABLE_NAME = "docs_received" -    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ -                  "trans_id, encrypted, idx, sync_id" - -    """ -    Period of recurrence of the periodic decrypting task, in seconds. -    """ -    DECRYPT_LOOP_PERIOD = 0.5 - -    def __init__(self, *args, **kwargs): -        """ -        Initialize the decrypter pool, and setup a dict for putting the -        results of the decrypted docs until they are picked by the insert -        routine that gets them in order. - -        :param insert_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type insert_doc_cb: function -        :param source_replica_uid: The source replica uid, used to find the -                                   correct callback for inserting documents. -        :type source_replica_uid: str -        """ -        self._insert_doc_cb = kwargs.pop("insert_doc_cb") -        self.source_replica_uid = kwargs.pop("source_replica_uid") - -        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - -        self._docs_to_process = None -        self._processed_docs = 0 -        self._last_inserted_idx = 0 - -        self._loop = LoopingCall(self._decrypt_and_recurse) - -    def _start_pool(self, period): -        self._loop.start(period) - -    def start(self, docs_to_process): -        """ -        Set the number of documents we expect to process. - -        This should be called by the during the sync exchange process as soon -        as we know how many documents are arriving from the server. - -        :param docs_to_process: The number of documents to process. -        :type docs_to_process: int -        """ -        SyncEncryptDecryptPool.start(self) -        self._decrypted_docs_indexes = set() -        self._sync_id = uuid4().hex -        self._docs_to_process = docs_to_process -        self._deferred = defer.Deferred() -        d = self._init_db() -        d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) -        return d - -    def stop(self): -        if self._loop.running: -            self._loop.stop() -        self._finish() -        SyncEncryptDecryptPool.stop(self) - -    def _init_db(self): -        """ -        Ensure sync_id column is present then -        Empty the received docs table of the sync database. - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % -                                 self.TABLE_NAME) -        d = self._runQuery(ensure_sync_id_column) - -        def empty_received_docs(_): -            query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) -            return self._runOperation(query, (self._sync_id,)) - -        d.addCallbacks(empty_received_docs, empty_received_docs) -        return d - -    def _errback(self, failure): -        logger.error(failure) -        self._deferred.errback(failure) -        self._processed_docs = 0 -        self._last_inserted_idx = 0 - -    @property -    def deferred(self): -        """ -        Deferred that will be fired when the decryption loop has finished -        processing all the documents. -        """ -        return self._deferred - -    def insert_encrypted_received_doc( -            self, doc_id, doc_rev, content, gen, trans_id, idx): -        """ -        Decrypt and insert a received document into local staging area to be -        processed later on. - -        :param doc_id: The document ID. -        :type doc_id: str -        :param doc_rev: The document Revision -        :param doc_rev: str -        :param content: The content of the document -        :type content: dict -        :param gen: The document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        :param idx: The index of this document in the current sync process. -        :type idx: int - -        :return: A deferred that will fire after the decrypted document has -                 been inserted in the sync db. -        :rtype: twisted.internet.defer.Deferred -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") - -        key = self._crypto.doc_passphrase(doc_id) -        secret = self._crypto.secret -        args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx -        # decrypt asynchronously -        # TODO use dedicated threadpool / move to ampoule -        d = threads.deferToThread( -            decrypt_doc_task, *args) -        # callback will insert it for later processing -        d.addCallback(self._decrypt_doc_cb) -        return d - -    def insert_received_doc( -            self, doc_id, doc_rev, content, gen, trans_id, idx): -        """ -        Insert a document that is not symmetrically encrypted. -        We store it in the staging area (the decrypted_docs dictionary) to be -        picked up in order as the preceding documents are decrypted. - -        :param doc_id: The document id -        :type doc_id: str -        :param doc_rev: The document revision -        :param doc_rev: str or dict -        :param content: The content of the document -        :type content: dict -        :param gen: The document generation -        :type gen: int -        :param trans_id: The transaction id -        :type trans_id: str -        :param idx: The index of this document in the current sync process. -        :type idx: int - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        if not isinstance(content, str): -            content = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ -                % self.TABLE_NAME -        d = self._runOperation( -            query, (doc_id, doc_rev, content, gen, trans_id, 0, -                    idx, self._sync_id)) -        d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) -        return d - -    def _delete_received_docs(self, doc_ids): -        """ -        Delete a list of received docs after get them inserted into the db. - -        :param doc_id: Document ID list. -        :type doc_id: list - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        placeholders = ', '.join('?' for _ in doc_ids) -        query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ -                % (self.TABLE_NAME, placeholders) -        return self._runOperation(query, (doc_ids)) - -    def _decrypt_doc_cb(self, result): -        """ -        Store the decryption result in the sync db from where it will later be -        picked by _process_decrypted_docs. - -        :param result: A tuple containing the document's id, revision, -                       content, generation, transaction id and sync index. -        :type result: tuple(str, str, str, int, str, int) - -        :return: A deferred that will fire after the document has been -                 inserted in the sync db. -        :rtype: twisted.internet.defer.Deferred -        """ -        doc_id, rev, content, gen, trans_id, idx = result -        logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" -                     % (doc_id, rev, gen, trans_id)) -        return self.insert_received_doc( -            doc_id, rev, content, gen, trans_id, idx) - -    def _get_docs(self, encrypted=None, sequence=None): -        """ -        Get documents from the received docs table in the sync db. - -        :param encrypted: If not None, only return documents with encrypted -                          field equal to given parameter. -        :type encrypted: bool or None -        :param order_by: The name of the field to order results. - -        :return: A deferred that will fire with the results of the database -                 query. -        :rtype: twisted.internet.defer.Deferred -        """ -        query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ -                "idx FROM %s" % self.TABLE_NAME -        parameters = [] -        if encrypted or sequence: -            query += " WHERE sync_id = ? and" -            parameters += [self._sync_id] -        if encrypted: -            query += " encrypted = ?" -            parameters += [int(encrypted)] -        if sequence: -            query += " idx in (" + ', '.join('?' * len(sequence)) + ")" -            parameters += [int(i) for i in sequence] -        query += " ORDER BY idx ASC" -        return self._runQuery(query, parameters) - -    @defer.inlineCallbacks -    def _get_insertable_docs(self): -        """ -        Return a list of non-encrypted documents ready to be inserted. - -        :return: A deferred that will fire with the list of insertable -                 documents. -        :rtype: twisted.internet.defer.Deferred -        """ -        # Here, check in memory what are the insertable indexes that can -        # form a sequence starting from the last inserted index -        sequence = [] -        insertable_docs = [] -        next_index = self._last_inserted_idx + 1 -        while next_index in self._decrypted_docs_indexes: -            sequence.append(str(next_index)) -            next_index += 1 -            if len(sequence) > 900: -                # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER -                # if we try to query more, SQLite will refuse -                # we need to find a way to improve this -                # being researched in #7669 -                break -        # Then fetch all the ones ready for insertion. -        if sequence: -            insertable_docs = yield self._get_docs(encrypted=False, -                                                   sequence=sequence) -        defer.returnValue(insertable_docs) - -    @defer.inlineCallbacks -    def _process_decrypted_docs(self): -        """ -        Fetch as many decrypted documents as can be taken from the expected -        order and insert them in the local replica. - -        :return: A deferred that will fire with the list of inserted -                 documents. -        :rtype: twisted.internet.defer.Deferred -        """ -        insertable = yield self._get_insertable_docs() -        processed_docs_ids = [] -        for doc_fields in insertable: -            method = self._insert_decrypted_local_doc -            # FIXME: This is used only because SQLCipherU1DBSync is synchronous -            # When adbapi is used there is no need for an external thread -            # Without this the reactor can freeze and fail docs download -            yield threads.deferToThread(method, *doc_fields) -            processed_docs_ids.append(doc_fields[0]) -        yield self._delete_received_docs(processed_docs_ids) - -    def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, -                                    gen, trans_id, encrypted, idx): -        """ -        Insert the decrypted document into the local replica. - -        Make use of the passed callback `insert_doc_cb` passed to the caller -        by u1db sync. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        """ -        # could pass source_replica in params for callback chain -        logger.debug("sync decrypter pool: inserting doc in local db: " -                     "%s:%s %s" % (doc_id, doc_rev, gen)) - -        # convert deleted documents to avoid error on document creation -        if content == 'null': -            content = None -        doc = SoledadDocument(doc_id, doc_rev, content) -        gen = int(gen) -        self._insert_doc_cb(doc, gen, trans_id) - -        # store info about processed docs -        self._last_inserted_idx = idx -        self._processed_docs += 1 - -    @defer.inlineCallbacks -    def _decrypt_and_recurse(self): -        """ -        Decrypt the documents received from remote replica and insert them -        into the local one. - -        This method implicitelly returns a defferred (see the decorator -        above). It should only be called by _launch_decrypt_and_process(). -        because this way any exceptions raised here will be stored by the -        errback attached to the deferred returned. - -        :return: A deferred which will fire after all decrypt, process and -                 delete operations have been executed. -        :rtype: twisted.internet.defer.Deferred -        """ -        if not self.running: -            defer.returnValue(None) -        processed = self._processed_docs -        pending = self._docs_to_process - -        if processed < pending: -            yield self._process_decrypted_docs() -        else: -            self._finish() - -    def _finish(self): -        self._processed_docs = 0 -        self._last_inserted_idx = 0 -        self._decrypted_docs_indexes = set() -        if not self._deferred.called: -            self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..c9da939c 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -43,8 +43,6 @@ class SyncTargetAPI(SyncTarget):      def close(self):          if self._sync_enc_pool:              self._sync_enc_pool.stop() -        if self._sync_decr_pool: -            self._sync_decr_pool.stop()          yield self._http.close()      @property @@ -153,7 +151,7 @@ class SyncTargetAPI(SyncTarget):      def sync_exchange(self, docs_by_generation, source_replica_uid,                        last_known_generation, last_known_trans_id,                        insert_doc_cb, ensure_callback=None, -                      defer_decryption=True, sync_id=None): +                      sync_id=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. After that, receive documents from the remote @@ -185,11 +183,6 @@ class SyncTargetAPI(SyncTarget):                                  created.          :type ensure_callback: function -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool -          :return: A deferred which fires with the new generation and                   transaction id of the target replica.          :rtype: twisted.internet.defer.Deferred @@ -221,8 +214,7 @@ class SyncTargetAPI(SyncTarget):          cur_target_gen, cur_target_trans_id = yield self._receive_docs(              last_known_generation, last_known_trans_id, -            ensure_callback, sync_id, -            defer_decryption=defer_decryption) +            ensure_callback, sync_id)          # update gen and trans id info in case we just sent and did not          # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 26606e9b..1f1bc480 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -21,7 +21,6 @@ from twisted.internet import defer  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import emit_async  from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.client.http_target.support import RequestBody  from leap.soledad.common.log import getLogger  from leap.soledad.common.document import SoledadDocument @@ -50,26 +49,11 @@ class HTTPDocFetcher(object):      @defer.inlineCallbacks      def _receive_docs(self, last_known_generation, last_known_trans_id, -                      ensure_callback, sync_id, defer_decryption): -        defer_decryption = False - -        self._queue_for_decrypt = defer_decryption \ -            and self._sync_db is not None +                      ensure_callback, sync_id):          new_generation = last_known_generation          new_transaction_id = last_known_trans_id -        if self._queue_for_decrypt: -            logger.debug( -                "Soledad sync: will queue received docs for decrypting.") - -        if defer_decryption: -            self._setup_sync_decr_pool() - -        # --------------------------------------------------------------------- -        # maybe receive the first document -        # --------------------------------------------------------------------- -          # we fetch the first document before fetching the rest because we need          # to know the total number of documents to be received, and this          # information comes as metadata to each request. @@ -85,14 +69,6 @@ class HTTPDocFetcher(object):              new_generation = ngen              new_transaction_id = ntrans -        # --------------------------------------------------------------------- -        # wait for async decryption to finish -        # --------------------------------------------------------------------- - -        if defer_decryption: -            yield self._sync_decr_pool.deferred -            self._sync_decr_pool.stop() -          defer.returnValue([new_generation, new_transaction_id])      def _fetch_all(self, last_known_generation, @@ -126,9 +102,6 @@ class HTTPDocFetcher(object):          new_generation, new_transaction_id, number_of_changes, entries =\              self._parse_received_doc_response(response) -        if self._sync_decr_pool and not self._sync_decr_pool.running: -            self._sync_decr_pool.start(number_of_changes) -          for doc_id, rev, content, gen, trans_id in entries:              if doc_id is not None:                  # decrypt incoming document and insert into local database @@ -136,31 +109,10 @@ class HTTPDocFetcher(object):                  # symmetric decryption of document's contents                  # ---------------------------------------------------------                  # If arriving content was symmetrically encrypted, we decrypt -                # it.  We do it inline if defer_decryption flag is False or no -                # sync_db was defined, otherwise we defer it writing it to the -                # received docs table.                  doc = SoledadDocument(doc_id, rev, content)                  if is_symmetrically_encrypted(doc): -                    if self._queue_for_decrypt: -                        self._sync_decr_pool.insert_encrypted_received_doc( -                            doc.doc_id, doc.rev, doc.content, gen, trans_id, -                            idx) -                    else: -                        # defer_decryption is False or no-sync-db fallback -                        doc.set_json(self._crypto.decrypt_doc(doc)) -                        self._insert_doc_cb(doc, gen, trans_id) -                else: -                    # not symmetrically encrypted doc, insert it directly -                    # or save it in the decrypted stage. -                    if self._queue_for_decrypt: -                        self._sync_decr_pool.insert_received_doc( -                            doc.doc_id, doc.rev, doc.content, gen, trans_id, -                            idx) -                    else: -                        self._insert_doc_cb(doc, gen, trans_id) -                # ------------------------------------------------------------- -                # end of symmetric decryption -                # ------------------------------------------------------------- +                    doc.set_json(self._crypto.decrypt_doc(doc)) +                self._insert_doc_cb(doc, gen, trans_id)              self._received_docs += 1              user_data = {'uuid': self.uuid, 'userid': self.userid}              _emit_receive_status(user_data, self._received_docs, total) @@ -212,18 +164,6 @@ class HTTPDocFetcher(object):          return new_generation, new_transaction_id, number_of_changes, \              entries -    def _setup_sync_decr_pool(self): -        """ -        Set up the SyncDecrypterPool for deferred decryption. -        """ -        if self._sync_decr_pool is None and self._sync_db is not None: -            # initialize syncing queue decryption pool -            self._sync_decr_pool = SyncDecrypterPool( -                self._crypto, -                self._sync_db, -                insert_doc_cb=self._insert_doc_cb, -                source_replica_uid=self.source_replica_uid) -  def _emit_receive_status(user_data, received_docs, total):      content = {'received': received_docs, 'total': total} diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface):          "Property, True if the syncer is syncing.")      token = Attribute("The authentication Token.") -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface):          :param url: the url of the target replica to sync with          :type url: str -        :param defer_decryption: -            Whether to defer the decryption process using the intermediate -            database. If False, decryption will be done inline. -        :type defer_decryption: bool -          :return:              A deferred that will fire with the local generation before the              synchronisation was performed. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index b198607d..ba341bbf 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -164,7 +164,7 @@ class SQLCipherOptions(object):          :param cipher_page_size: The page size.          :type cipher_page_size: int          :param defer_encryption: -            Whether to defer encryption/decryption of documents, or do it +            Whether to defer encryption of documents, or do it              inline while syncing.          :type defer_encryption: bool          """ @@ -480,7 +480,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              raise DatabaseAccessError(str(e))      @defer.inlineCallbacks -    def sync(self, url, creds=None, defer_decryption=True): +    def sync(self, url, creds=None):          """          Synchronize documents with remote replica exposed at url. @@ -495,10 +495,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :param creds: optional dictionary giving credentials to authorize the                        operation with the server.          :type creds: dict -        :param defer_decryption: -            Whether to defer the decryption process using the intermediate -            database. If False, decryption will be done inline. -        :type defer_decryption: bool          :return:              A Deferred, that will fire with the local generation (type `int`) @@ -510,8 +506,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              self.sync_phase = syncer.sync_phase              self.syncer = syncer              self.sync_exchange_phase = syncer.sync_exchange_phase -        local_gen_before_sync = yield syncer.sync( -            defer_decryption=defer_decryption) +        local_gen_before_sync = yield syncer.sync()          self.received_docs = syncer.received_docs          defer.returnValue(local_gen_before_sync) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 4cbd9f2a..d3cfe029 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer):              self.sync_exchange_phase = None      @defer.inlineCallbacks -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize documents between source and target. -        Differently from u1db `Synchronizer.sync` method, this one allows to -        pass a `defer_decryption` flag that will postpone the last -        step in the synchronization dance, namely, the setting of the last -        known generation and transaction id for a given remote replica. - -        This is done to allow the ongoing parallel decryption of the incoming -        docs to proceed without `InvalidGeneration` conflicts. - -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool -          :return: A deferred which will fire after the sync has finished with                   the local generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred @@ -172,8 +159,7 @@ class SoledadSynchronizer(Synchronizer):          new_gen, new_trans_id = yield sync_target.sync_exchange(              docs_by_generation, self.source._replica_uid,              target_last_known_gen, target_last_known_trans_id, -            self._insert_doc_from_target, ensure_callback=ensure_callback, -            defer_decryption=defer_decryption) +            self._insert_doc_from_target, ensure_callback=ensure_callback)          logger.debug("target gen after sync: %d" % new_gen)          logger.debug("target trans_id after sync: %s" % new_trans_id)          if hasattr(self.source, 'commit'): diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py index 77091a41..8e820b9c 100644 --- a/testing/tests/perf/test_encdecpool.py +++ b/testing/tests/perf/test_encdecpool.py @@ -3,7 +3,6 @@ import json  from uuid import uuid4  from twisted.internet.defer import gatherResults  from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.common.document import SoledadDocument  # FIXME: test load is low due issue #7370, higher values will get out of memory @@ -36,43 +35,3 @@ def create_encrypt(amount, size):  test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000)  test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000)  test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000) - - -def create_decrypt(amount, size): -    @pytest.mark.benchmark(group="test_pool_decrypt") -    @pytest.inlineCallbacks -    def test(soledad_client, txbenchmark_with_setup, request, payload): -        DOC_CONTENT = {'payload': payload(size)} -        client = soledad_client() - -        def setup(): -            pool = SyncDecrypterPool( -                client._crypto, -                client._sync_db, -                source_replica_uid=client._dbpool.replica_uid, -                insert_doc_cb=lambda x, y, z: False)  # ignored -            pool.start(amount) -            request.addfinalizer(pool.stop) -            crypto = client._crypto -            docs = [] -            for _ in xrange(amount): -                doc = SoledadDocument( -                    doc_id=uuid4().hex, rev='rev', -                    json=json.dumps(DOC_CONTENT)) -                encrypted_content = json.loads(crypto.encrypt_doc(doc)) -                docs.append((doc.doc_id, encrypted_content)) -            return pool, docs - -        def put_and_wait(pool, docs): -            deferreds = []  # fires on completion -            for idx, (doc_id, content) in enumerate(docs, 1): -                deferreds.append(pool.insert_encrypted_received_doc( -                    doc_id, 'rev', content, idx, "trans_id", idx)) -            return gatherResults(deferreds) - -        yield txbenchmark_with_setup(setup, put_and_wait) -    return test - -test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000) -test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000) -test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000) diff --git a/testing/tests/perf/test_sync.py b/testing/tests/perf/test_sync.py index 4d42395b..0b48a0b9 100644 --- a/testing/tests/perf/test_sync.py +++ b/testing/tests/perf/test_sync.py @@ -23,8 +23,7 @@ def create_upload(uploads, size):          def setup():              return load_up(client, uploads, payload(size)) -        yield txbenchmark_with_setup(setup, client.sync, -                                     defer_decryption=False) +        yield txbenchmark_with_setup(setup, client.sync)      return test diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py index 4a32885e..7055a765 100644 --- a/testing/tests/sync/test_encdecpool.py +++ b/testing/tests/sync/test_encdecpool.py @@ -1,34 +1,11 @@  # -*- coding: utf-8 -*- -# test_encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for encryption and decryption pool. -"""  import json -from random import shuffle - -from mock import MagicMock  from twisted.internet.defer import inlineCallbacks  from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.common.document import SoledadDocument  from test_soledad.util import BaseSoledadTest -from twisted.internet import defer  DOC_ID = "mydoc"  DOC_REV = "rev" @@ -69,238 +46,3 @@ class TestSyncEncrypterPool(BaseSoledadTest):          encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)          self.assertIsNotNone(encrypted) - - -class TestSyncDecrypterPool(BaseSoledadTest): - -    def _insert_doc_cb(self, doc, gen, trans_id): -        """ -        Method used to mock the sync's return_doc_cb callback. -        """ -        self._inserted_docs.append((doc, gen, trans_id)) - -    def _setup_pool(self, sync_db=None): -        sync_db = sync_db or self._soledad._sync_db -        return SyncDecrypterPool( -            self._soledad._crypto, -            sync_db, -            source_replica_uid=self._soledad._dbpool.replica_uid, -            insert_doc_cb=self._insert_doc_cb) - -    def setUp(self): -        BaseSoledadTest.setUp(self) -        # setup the pool -        self._pool = self._setup_pool() -        # reset the inserted docs mock -        self._inserted_docs = [] - -    def tearDown(self): -        if self._pool.running: -            self._pool.stop() -        BaseSoledadTest.tearDown(self) - -    def test_insert_received_doc(self): -        """ -        Test that one document added to the pool is inserted using the -        callback. -        """ -        self._pool.start(1) -        self._pool.insert_received_doc( -            DOC_ID, DOC_REV, "{}", 1, "trans_id", 1) - -        def _assert_doc_was_inserted(_): -            self.assertEqual( -                self._inserted_docs, -                [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")]) - -        self._pool.deferred.addCallback(_assert_doc_was_inserted) -        return self._pool.deferred - -    def test_looping_control(self): -        """ -        Start and stop cleanly. -        """ -        self._pool.start(10) -        self.assertTrue(self._pool.running) -        self._pool.stop() -        self.assertFalse(self._pool.running) -        self.assertTrue(self._pool.deferred.called) - -    def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self): -        """ -        Test that docs_received table is migrated, and has the sync_id column -        """ -        mock_run_query = MagicMock(return_value=defer.succeed(None)) -        mock_sync_db = MagicMock() -        mock_sync_db.runQuery = mock_run_query -        pool = self._setup_pool(mock_sync_db) -        d = pool.start(10) -        pool.stop() - -        def assert_trial_to_create_sync_id_column(_): -            mock_run_query.assert_called_once_with( -                "ALTER TABLE docs_received ADD COLUMN sync_id") - -        d.addCallback(assert_trial_to_create_sync_id_column) -        return d - -    def test_insert_received_doc_many(self): -        """ -        Test that many documents added to the pool are inserted using the -        callback. -        """ -        many = 100 -        self._pool.start(many) - -        # insert many docs in the pool -        for i in xrange(many): -            gen = idx = i + 1 -            doc_id = "doc_id: %d" % idx -            rev = "rev: %d" % idx -            content = {'idx': idx} -            trans_id = "trans_id: %d" % idx -            self._pool.insert_received_doc( -                doc_id, rev, content, gen, trans_id, idx) - -        def _assert_doc_was_inserted(_): -            self.assertEqual(many, len(self._inserted_docs)) -            idx = 1 -            for doc, gen, trans_id in self._inserted_docs: -                expected_gen = idx -                expected_doc_id = "doc_id: %d" % idx -                expected_rev = "rev: %d" % idx -                expected_content = json.dumps({'idx': idx}) -                expected_trans_id = "trans_id: %d" % idx - -                self.assertEqual(expected_doc_id, doc.doc_id) -                self.assertEqual(expected_rev, doc.rev) -                self.assertEqual(expected_content, json.dumps(doc.content)) -                self.assertEqual(expected_gen, gen) -                self.assertEqual(expected_trans_id, trans_id) - -                idx += 1 - -        self._pool.deferred.addCallback(_assert_doc_was_inserted) -        return self._pool.deferred - -    def test_insert_encrypted_received_doc(self): -        """ -        Test that one encrypted document added to the pool is decrypted and -        inserted using the callback. -        """ -        crypto = self._soledad._crypto -        doc = SoledadDocument( -            doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) -        encrypted_content = json.loads(crypto.encrypt_doc(doc)) - -        # insert the encrypted document in the pool -        self._pool.start(1) -        self._pool.insert_encrypted_received_doc( -            DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) - -        def _assert_doc_was_decrypted_and_inserted(_): -            self.assertEqual(1, len(self._inserted_docs)) -            self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) - -        self._pool.deferred.addCallback( -            _assert_doc_was_decrypted_and_inserted) -        return self._pool.deferred - -    @inlineCallbacks -    def test_processing_order(self): -        """ -        This test ensures that processing of documents only occur if there is -        a sequence in place. -        """ -        crypto = self._soledad._crypto - -        docs = [] -        for i in xrange(1, 10): -            i = str(i) -            doc = SoledadDocument( -                doc_id=DOC_ID + i, rev=DOC_REV + i, -                json=json.dumps(DOC_CONTENT)) -            encrypted_content = json.loads(crypto.encrypt_doc(doc)) -            docs.append((doc, encrypted_content)) - -        # insert the encrypted document in the pool -        yield self._pool.start(10)  # pool is expecting to process 10 docs -        self._pool._loop.stop()  # we are processing manually -        # first three arrives, forming a sequence -        for i, (doc, encrypted_content) in enumerate(docs[:3]): -            gen = idx = i + 1 -            yield self._pool.insert_encrypted_received_doc( -                doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) - -        # last one arrives alone, so it can't be processed -        doc, encrypted_content = docs[-1] -        yield self._pool.insert_encrypted_received_doc( -            doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) - -        yield self._pool._decrypt_and_recurse() - -        self.assertEqual(3, self._pool._processed_docs) - -    def test_insert_encrypted_received_doc_many(self, many=100): -        """ -        Test that many encrypted documents added to the pool are decrypted and -        inserted using the callback. -        """ -        crypto = self._soledad._crypto -        self._pool.start(many) -        docs = [] - -        # insert many encrypted docs in the pool -        for i in xrange(many): -            gen = idx = i + 1 -            doc_id = "doc_id: %d" % idx -            rev = "rev: %d" % idx -            content = {'idx': idx} -            trans_id = "trans_id: %d" % idx - -            doc = SoledadDocument( -                doc_id=doc_id, rev=rev, json=json.dumps(content)) - -            encrypted_content = json.loads(crypto.encrypt_doc(doc)) -            docs.append((doc_id, rev, encrypted_content, gen, -                         trans_id, idx)) -        shuffle(docs) - -        for doc in docs: -            self._pool.insert_encrypted_received_doc(*doc) - -        def _assert_docs_were_decrypted_and_inserted(_): -            self.assertEqual(many, len(self._inserted_docs)) -            idx = 1 -            for doc, gen, trans_id in self._inserted_docs: -                expected_gen = idx -                expected_doc_id = "doc_id: %d" % idx -                expected_rev = "rev: %d" % idx -                expected_content = json.dumps({'idx': idx}) -                expected_trans_id = "trans_id: %d" % idx - -                self.assertEqual(expected_doc_id, doc.doc_id) -                self.assertEqual(expected_rev, doc.rev) -                self.assertEqual(expected_content, json.dumps(doc.content)) -                self.assertEqual(expected_gen, gen) -                self.assertEqual(expected_trans_id, trans_id) - -                idx += 1 - -        self._pool.deferred.addCallback( -            _assert_docs_were_decrypted_and_inserted) -        return self._pool.deferred - -    @inlineCallbacks -    def test_pool_reuse(self): -        """ -        The pool is reused between syncs, this test verifies that -        reusing is fine. -        """ -        for i in xrange(3): -            yield self.test_insert_encrypted_received_doc_many(5) -            self._inserted_docs = [] -            decrypted_docs = yield self._pool._get_docs(encrypted=False) -            # check that decrypted docs staging is clean -            self.assertEquals([], decrypted_docs) -            self._pool.stop() diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py index 5290003e..a7d0a92b 100644 --- a/testing/tests/sync/test_sync.py +++ b/testing/tests/sync/test_sync.py @@ -187,7 +187,7 @@ class TestSoledadDbSync(          self.addCleanup(target.close)          return sync.SoledadSynchronizer(              self.db, -            target).sync(defer_decryption=False) +            target).sync()      @defer.inlineCallbacks      def test_db_sync(self): diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py index 4948aaf8..482b150c 100644 --- a/testing/tests/sync/test_sync_deferred.py +++ b/testing/tests/sync/test_sync_deferred.py @@ -14,7 +14,7 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>.  """ -Test Leap backend bits: sync with deferred encryption/decryption. +Test Leap backend bits: sync with deferred encryption.  """  import time  import os @@ -22,12 +22,8 @@ import random  import string  import shutil -from urlparse import urljoin -  from twisted.internet import defer -from leap.soledad.common import couch -  from leap.soledad.client import sync  from leap.soledad.client.sqlcipher import SQLCipherOptions  from leap.soledad.client.sqlcipher import SQLCipherDatabase @@ -41,9 +37,6 @@ from test_soledad.util import make_soledad_app  from test_soledad.util import soledad_sync_target -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = True -  WAIT_STEP = 1  MAX_WAIT = 10  DBPASS = "pass" @@ -52,7 +45,7 @@ DBPASS = "pass"  class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):      """ -    Another base class for testing the deferred encryption/decryption during +    Another base class for testing the deferred encryption during      the syncs, using the intermediate database.      """      defer_sync_encryption = True @@ -109,7 +102,7 @@ class TestSoledadDbSyncDeferredEncDecr(      """      Test db.sync remote sync shortcut. -    Case with deferred encryption and decryption: using the intermediate +    Case with deferred encryption: using the intermediate      syncdb.      """ @@ -158,7 +151,7 @@ class TestSoledadDbSyncDeferredEncDecr(          self.addCleanup(target.close)          return sync.SoledadSynchronizer(              dbsyncer, -            target).sync(defer_decryption=True) +            target).sync()      def wait_for_sync(self):          """ diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py index 2626ab2a..2bcb3aec 100644 --- a/testing/tests/sync/test_sync_mutex.py +++ b/testing/tests/sync/test_sync_mutex.py @@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target  _old_sync = SoledadSynchronizer.sync -def _timed_sync(self, defer_decryption=True): +def _timed_sync(self):      t = time.time()      sync_id = uuid.uuid4() @@ -62,7 +62,7 @@ def _timed_sync(self, defer_decryption=True):          self.source.sync_times[sync_id]['end'] = t          return passthrough -    d = _old_sync(self, defer_decryption=defer_decryption) +    d = _old_sync(self)      d.addBoth(_store_finish_time)      return d diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index 964468ce..a2935539 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -231,8 +231,7 @@ class TestSoledadSyncTarget(          doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')          new_gen, trans_id = yield remote_target.sync_exchange(              [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=receive_doc)          self.assertEqual(1, new_gen)          self.assertGetEncryptedDoc(              db, 'doc-here', 'replica:1', '{"value": "here"}', False) @@ -285,8 +284,7 @@ class TestSoledadSyncTarget(                  'replica',                  last_known_generation=0,                  last_known_trans_id=None, -                insert_doc_cb=receive_doc, -                defer_decryption=False) +                insert_doc_cb=receive_doc)          self.assertGetEncryptedDoc(              db, 'doc-here', 'replica:1', '{"value": "here"}', @@ -298,8 +296,7 @@ class TestSoledadSyncTarget(          trigger_ids = []          new_gen, trans_id = yield remote_target.sync_exchange(              [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=receive_doc)          self.assertGetEncryptedDoc(              db, 'doc-here2', 'replica:1', '{"value": "here2"}',              False) @@ -331,7 +328,7 @@ class TestSoledadSyncTarget(          new_gen, trans_id = yield remote_target.sync_exchange(              [(doc, 10, 'T-sid')], 'replica', last_known_generation=0,              last_known_trans_id=None, insert_doc_cb=receive_doc, -            ensure_callback=ensure_cb, defer_decryption=False) +            ensure_callback=ensure_cb)          self.assertEqual(1, new_gen)          db = self.db2          self.assertEqual(1, len(replica_uid_box)) @@ -446,8 +443,7 @@ class SoledadDatabaseSyncTargetTests(               'T-sid')]          new_gen, trans_id = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertGetEncryptedDoc(              self.db, 'doc-id', 'replica:1', tests.simple_doc, False)          self.assertTransactionLog(['doc-id'], self.db) @@ -471,8 +467,7 @@ class SoledadDatabaseSyncTargetTests(                  'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]          new_gen, trans_id = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertGetEncryptedDoc(              self.db, 'doc-id', 'replica:1', tests.simple_doc, False)          self.assertGetEncryptedDoc( @@ -498,8 +493,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)          new_gen, _ = yield self.st.sync_exchange(              [], 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)          self.assertEqual(2, new_gen)          self.assertEqual( @@ -779,10 +773,6 @@ class SoledadDatabaseSyncTargetTests(          yield self.st.record_sync_info('replica', 0, 'T-sid')          self.assertEqual(expected, called) - -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = False -  WAIT_STEP = 1  MAX_WAIT = 10  DBPASS = "pass" @@ -890,8 +880,7 @@ class TestSoledadDbSync(                  defer_encryption=True)              self.dbsyncer = dbsyncer              return dbsyncer.sync(target_url, -                                 creds=creds, -                                 defer_decryption=DEFER_DECRYPTION) +                                 creds=creds)          else:              return self._do_sync(self, target_name) | 
