diff options
| -rw-r--r-- | client/changes/next-changelog.rst | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 183 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 2 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/tests/test_encdecpool.py | 47 | ||||
| -rw-r--r-- | common/src/leap/soledad/common/tests/test_server.py | 4 | 
6 files changed, 122 insertions, 124 deletions
diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index a696fe10..c676625f 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -26,6 +26,7 @@ Misc  - Refactor bootstrap to remove shared db lock.  - `#1236 <https://leap.se/code/issues/1236>`_: Description of the new feature corresponding with issue #1236.  - Some change without issue number. +- Removed multiprocessing from encdecpool with some extra refactoring.  Known Issues  ~~~~~~~~~~~~ diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 576b8b2c..218ebfa9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -25,7 +25,7 @@ during synchronization.  import json  import logging -from twisted.internet import reactor +from twisted.internet.task import LoopingCall  from twisted.internet import threads  from twisted.internet import defer  from twisted.python import log @@ -167,26 +167,14 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          SyncEncryptDecryptPool.stop(self) -    def enqueue_doc_for_encryption(self, doc): +    def encrypt_doc(self, doc):          """ -        Enqueue a document for encryption. +        Encrypt document asynchronously then insert it on +        local staging database.          :param doc: The document to be encrypted.          :type doc: SoledadDocument          """ -        self._encrypt_doc(doc) - -    def _encrypt_doc(self, doc): -        """ -        Symmetrically encrypt a document. - -        :param doc: The document with contents to be encrypted. -        :type doc: SoledadDocument - -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool -        """          soledad_assert(self._crypto is not None, "need a crypto object")          docstr = doc.get_json()          key = self._crypto.doc_passphrase(doc.doc_id) @@ -276,8 +264,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,      :type doc_id: str      :param doc_rev: The document revision.      :type doc_rev: str -    :param content: The encrypted content of the document. -    :type content: str +    :param content: The encrypted content of the document as JSON dict. +    :type content: dict      :param gen: The generation corresponding to the modification of that                  document.      :type gen: int @@ -294,7 +282,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,      :return: A tuple containing the doc id, revision and encrypted content.      :rtype: tuple(str, str, str)      """ -    content = json.loads(content) if type(content) == str else content      decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)      return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -356,14 +343,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._processed_docs = 0          self._last_inserted_idx = 0 -        # a list that holds the asynchronous decryption results so they can be -        # collected when they are ready -        self._async_results = [] -          # initialize db and make sure any database operation happens after          # db initialization          self._deferred_init = self._init_db()          self._wait_init_db('_runOperation', '_runQuery') +        self._loop = LoopingCall(self._decrypt_and_recurse) +        self._decrypted_docs_indexes = set()      def _wait_init_db(self, *methods):          """ @@ -408,11 +393,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          SyncEncryptDecryptPool.start(self)          self._docs_to_process = docs_to_process          self._deferred = defer.Deferred() -        reactor.callWhenRunning(self._launch_decrypt_and_recurse) +        self._loop.start(self.DECRYPT_LOOP_PERIOD) -    def _launch_decrypt_and_recurse(self): -        d = self._decrypt_and_recurse() -        d.addErrback(self._errback) +    def stop(self): +        if self._loop.running: +            self._loop.stop() +        self._finish() +        SyncEncryptDecryptPool.stop(self)      def _errback(self, failure):          log.err(failure) @@ -431,8 +418,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      def insert_encrypted_received_doc(              self, doc_id, doc_rev, content, gen, trans_id, idx):          """ -        Insert a received message with encrypted content, to be decrypted later -        on. +        Decrypt and insert a received document into local staging area to be +        processed later on.          :param doc_id: The document ID.          :type doc_id: str @@ -447,11 +434,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :param idx: The index of this document in the current sync process.          :type idx: int -        :return: A deferred that will fire when the operation in the database -                 has finished. +        :return: A deferred that will fire after the decrypted document has +                 been inserted in the sync db.          :rtype: twisted.internet.defer.Deferred          """ -        return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx) +        soledad_assert(self._crypto is not None, "need a crypto object") + +        key = self._crypto.doc_passphrase(doc_id) +        secret = self._crypto.secret +        args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx +        # decrypt asynchronously +        doc = decrypt_doc_task(*args) +        # callback will insert it for later processing +        return self._decrypt_doc_cb(doc)      def insert_received_doc(              self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -481,52 +476,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              content = json.dumps(content)          query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \                  % self.TABLE_NAME -        return self._runOperation( +        d = self._runOperation(              query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) +        d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) +        return d -    def _delete_received_doc(self, doc_id): +    def _delete_received_docs(self, doc_ids):          """ -        Delete a received doc after it was inserted into the local db. +        Delete a list of received docs after get them inserted into the db. -        :param doc_id: Document ID. -        :type doc_id: str +        :param doc_id: Document ID list. +        :type doc_id: list          :return: A deferred that will fire when the operation in the database                   has finished.          :rtype: twisted.internet.defer.Deferred          """ -        query = "DELETE FROM '%s' WHERE doc_id=?" \ -                % self.TABLE_NAME -        return self._runOperation(query, (doc_id,)) - -    def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): -        """ -        Dispatch an asynchronous document decrypting routine and save the -        result object. - -        :param doc_id: The ID for the document with contents to be encrypted. -        :type doc: str -        :param rev: The revision of the document. -        :type rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        :param idx: The index of this document in the current sync process. -        :type idx: int -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") - -        key = self._crypto.doc_passphrase(doc_id) -        secret = self._crypto.secret -        args = doc_id, rev, content, gen, trans_id, key, secret, idx -        # decrypt asynchronously -        d = threads.deferToThread(decrypt_doc_task, *args) -        d.addCallback(self._decrypt_doc_cb) +        placeholders = ', '.join('?' for _ in doc_ids) +        query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ +                % (self.TABLE_NAME, placeholders) +        return self._runOperation(query, (doc_ids))      def _decrypt_doc_cb(self, result):          """ @@ -547,7 +516,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          return self.insert_received_doc(              doc_id, rev, content, gen, trans_id, idx) -    def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): +    def _get_docs(self, encrypted=None, order_by='idx', order='ASC', +                  sequence=None):          """          Get documents from the received docs table in the sync db. @@ -565,8 +535,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \                  "idx FROM %s" % self.TABLE_NAME -        if encrypted is not None: -            query += " WHERE encrypted = %d" % int(encrypted) +        if encrypted or sequence: +            query += " WHERE" +        if encrypted: +            query += " encrypted = %d" % int(encrypted) +        if sequence: +            query += " idx in (" + ', '.join(sequence) + ")" +          query += " ORDER BY %s %s" % (order_by, order)          return self._runQuery(query) @@ -579,18 +554,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                   documents.          :rtype: twisted.internet.defer.Deferred          """ -        # here, we fetch the list of decrypted documents and compare with the -        # index of the last succesfully processed document. -        decrypted_docs = yield self._get_docs(encrypted=False) -        insertable = [] -        last_idx = self._last_inserted_idx -        for doc_id, rev, content, gen, trans_id, encrypted, idx in \ -                decrypted_docs: -            if (idx != last_idx + 1): -                break -            insertable.append((doc_id, rev, content, gen, trans_id, idx)) -            last_idx += 1 -        defer.returnValue(insertable) +        # Here, check in memory what are the insertable indexes that can +        # form a sequence starting from the last inserted index +        sequence = [] +        insertable_docs = [] +        next_index = self._last_inserted_idx + 1 +        while next_index in self._decrypted_docs_indexes: +            sequence.append(str(next_index)) +            next_index += 1 +        # Then fetch all the ones ready for insertion. +        if sequence: +            insertable_docs = yield self._get_docs(encrypted=False, +                                                   sequence=sequence) +        defer.returnValue(insertable_docs)      @defer.inlineCallbacks      def _process_decrypted_docs(self): @@ -603,36 +579,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :rtype: twisted.internet.defer.Deferred          """          insertable = yield self._get_insertable_docs() +        processed_docs_ids = []          for doc_fields in insertable:              method = self._insert_decrypted_local_doc              # FIXME: This is used only because SQLCipherU1DBSync is synchronous              # When adbapi is used there is no need for an external thread              # Without this the reactor can freeze and fail docs download              yield threads.deferToThread(method, *doc_fields) -        defer.returnValue(insertable) - -    def _delete_processed_docs(self, inserted): -        """ -        Delete from the sync db documents that have been processed. - -        :param inserted: List of documents inserted in the previous process -                         step. -        :type inserted: list - -        :return: A list of deferreds that will fire when each operation in the -                 database has finished. -        :rtype: twisted.internet.defer.DeferredList -        """ -        deferreds = [] -        for doc_id, doc_rev, _, _, _, _ in inserted: -            deferreds.append( -                self._delete_received_doc(doc_id)) -        if not deferreds: -            return defer.succeed(None) -        return defer.gatherResults(deferreds) +            processed_docs_ids.append(doc_fields[0]) +        yield self._delete_received_docs(processed_docs_ids)      def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, -                                    gen, trans_id, idx): +                                    gen, trans_id, encrypted, idx):          """          Insert the decrypted document into the local replica. @@ -693,20 +651,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                   delete operations have been executed.          :rtype: twisted.internet.defer.Deferred          """ +        if not self.running: +            defer.returnValue(None)          processed = self._processed_docs          pending = self._docs_to_process          if processed < pending: -            docs = yield self._process_decrypted_docs() -            yield self._delete_processed_docs(docs) -            # recurse -            self._delayed_call = reactor.callLater( -                self.DECRYPT_LOOP_PERIOD, -                self._launch_decrypt_and_recurse) +            yield self._process_decrypted_docs()          else:              self._finish()      def _finish(self):          self._processed_docs = 0          self._last_inserted_idx = 0 -        self._deferred.callback(None) +        self._decrypted_docs_indexes = set() +        if not self._deferred.called: +            self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index fda90909..9f7a4193 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,7 +19,6 @@ import json  from u1db import errors  from u1db.remote import utils  from twisted.internet import defer -from twisted.internet import threads  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import emit_async @@ -76,7 +75,7 @@ class HTTPDocFetcher(object):              last_known_generation, last_known_trans_id,              sync_id, 0)          self._received_docs = 0 -        number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1) +        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)          if ngen:              new_generation = ngen @@ -138,7 +137,6 @@ class HTTPDocFetcher(object):              body=str(body),              content_type='application/x-soledad-sync-get') -    @defer.inlineCallbacks      def _insert_received_doc(self, response, idx, total):          """          Insert a received document into the local replica. @@ -152,8 +150,7 @@ class HTTPDocFetcher(object):          """          new_generation, new_transaction_id, number_of_changes, doc_id, \              rev, content, gen, trans_id = \ -            (yield threads.deferToThread(self._parse_received_doc_response, -                                         response)) +            self._parse_received_doc_response(response)          if doc_id is not None:              # decrypt incoming document and insert into local database              # ------------------------------------------------------------- @@ -188,7 +185,7 @@ class HTTPDocFetcher(object):          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid}          _emit_receive_status(user_data, self._received_docs, total) -        defer.returnValue((number_of_changes, new_generation, new_transaction_id)) +        return number_of_changes, new_generation, new_transaction_id      def _parse_received_doc_response(self, response):          """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 22ddc87d..cdc7255c 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -278,7 +278,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)          if self.defer_encryption:              # TODO move to api? -            self._sync_enc_pool.enqueue_doc_for_encryption(doc) +            self._sync_enc_pool.encrypt_doc(doc)          return doc_rev      # diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py index 694eb7ad..6676c298 100644 --- a/common/src/leap/soledad/common/tests/test_encdecpool.py +++ b/common/src/leap/soledad/common/tests/test_encdecpool.py @@ -57,13 +57,13 @@ class TestSyncEncrypterPool(BaseSoledadTest):          self.assertIsNone(doc)      @inlineCallbacks -    def test_enqueue_doc_for_encryption_and_get_encrypted_doc(self): +    def test_encrypt_doc_and_get_it_back(self):          """          Test that the pool actually encrypts a document added to the queue.          """          doc = SoledadDocument(              doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) -        self._pool.enqueue_doc_for_encryption(doc) +        self._pool.encrypt_doc(doc)          # exhaustivelly attempt to get the encrypted document          encrypted = None @@ -117,6 +117,16 @@ class TestSyncDecrypterPool(BaseSoledadTest):          self._pool.deferred.addCallback(_assert_doc_was_inserted)          return self._pool.deferred +    def test_looping_control(self): +        """ +        Start and stop cleanly. +        """ +        self._pool.start(10) +        self.assertTrue(self._pool.running) +        self._pool.stop() +        self.assertFalse(self._pool.running) +        self.assertTrue(self._pool.deferred.called) +      def test_insert_received_doc_many(self):          """          Test that many documents added to the pool are inserted using the @@ -179,6 +189,38 @@ class TestSyncDecrypterPool(BaseSoledadTest):              _assert_doc_was_decrypted_and_inserted)          return self._pool.deferred +    @inlineCallbacks +    def test_processing_order(self): +        """ +        This test ensures that processing of documents only occur if there is +        a sequence in place. +        """ +        crypto = self._soledad._crypto +        docs = [] +        for i in xrange(1, 10): +            i = str(i) +            doc = SoledadDocument( +                doc_id=DOC_ID + i, rev=DOC_REV + i, +                json=json.dumps(DOC_CONTENT)) +            encrypted_content = json.loads(crypto.encrypt_doc(doc)) +            docs.append((doc, encrypted_content)) + +        # insert the encrypted document in the pool +        self._pool.start(10)  # pool is expecting to process 10 docs +        # first three arrives, forming a sequence +        for i, (doc, encrypted_content) in enumerate(docs[:3]): +            gen = idx = i + 1 +            yield self._pool.insert_encrypted_received_doc( +                doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) +        # last one arrives alone, so it can't be processed +        doc, encrypted_content = docs[-1] +        yield self._pool.insert_encrypted_received_doc( +            doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) + +        yield self._pool._decrypt_and_recurse() + +        self.assertEqual(3, self._pool._processed_docs) +      def test_insert_encrypted_received_doc_many(self, many=100):          """          Test that many encrypted documents added to the pool are decrypted and @@ -241,3 +283,4 @@ class TestSyncDecrypterPool(BaseSoledadTest):              decrypted_docs = yield self._pool._get_docs(encrypted=False)              # check that decrypted docs staging is clean              self.assertEquals([], decrypted_docs) +            self._pool.stop() diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index bf6c1515..ba7edfe3 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -481,8 +481,8 @@ class EncryptedSyncTestCase(          Test if Soledad can sync very large files.          """          self.skipTest( -            "Work in progress. For reference, see: " -            "https://leap.se/code/issues/7370") +                "Work in progress. For reference, see: " +                "https://leap.se/code/issues/7370")          length = 100 * (10 ** 6)  # 100 MB          return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1)  | 
