diff options
-rw-r--r-- | client/changes/next-changelog.rst | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 183 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 2 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_encdecpool.py | 47 | ||||
-rw-r--r-- | common/src/leap/soledad/common/tests/test_server.py | 4 |
6 files changed, 122 insertions, 124 deletions
diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst index a696fe10..c676625f 100644 --- a/client/changes/next-changelog.rst +++ b/client/changes/next-changelog.rst @@ -26,6 +26,7 @@ Misc - Refactor bootstrap to remove shared db lock. - `#1236 <https://leap.se/code/issues/1236>`_: Description of the new feature corresponding with issue #1236. - Some change without issue number. +- Removed multiprocessing from encdecpool with some extra refactoring. Known Issues ~~~~~~~~~~~~ diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 576b8b2c..218ebfa9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -25,7 +25,7 @@ during synchronization. import json import logging -from twisted.internet import reactor +from twisted.internet.task import LoopingCall from twisted.internet import threads from twisted.internet import defer from twisted.python import log @@ -167,26 +167,14 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.stop(self) - def enqueue_doc_for_encryption(self, doc): + def encrypt_doc(self, doc): """ - Enqueue a document for encryption. + Encrypt document asynchronously then insert it on + local staging database. :param doc: The document to be encrypted. :type doc: SoledadDocument """ - self._encrypt_doc(doc) - - def _encrypt_doc(self, doc): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ soledad_assert(self._crypto is not None, "need a crypto object") docstr = doc.get_json() key = self._crypto.doc_passphrase(doc.doc_id) @@ -276,8 +264,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :type doc_id: str :param doc_rev: The document revision. :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str + :param content: The encrypted content of the document as JSON dict. + :type content: dict :param gen: The generation corresponding to the modification of that document. :type gen: int @@ -294,7 +282,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :return: A tuple containing the doc id, revision and encrypted content. :rtype: tuple(str, str, str) """ - content = json.loads(content) if type(content) == str else content decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -356,14 +343,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._processed_docs = 0 self._last_inserted_idx = 0 - # a list that holds the asynchronous decryption results so they can be - # collected when they are ready - self._async_results = [] - # initialize db and make sure any database operation happens after # db initialization self._deferred_init = self._init_db() self._wait_init_db('_runOperation', '_runQuery') + self._loop = LoopingCall(self._decrypt_and_recurse) + self._decrypted_docs_indexes = set() def _wait_init_db(self, *methods): """ @@ -408,11 +393,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.start(self) self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - reactor.callWhenRunning(self._launch_decrypt_and_recurse) + self._loop.start(self.DECRYPT_LOOP_PERIOD) - def _launch_decrypt_and_recurse(self): - d = self._decrypt_and_recurse() - d.addErrback(self._errback) + def stop(self): + if self._loop.running: + self._loop.stop() + self._finish() + SyncEncryptDecryptPool.stop(self) def _errback(self, failure): log.err(failure) @@ -431,8 +418,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): """ - Insert a received message with encrypted content, to be decrypted later - on. + Decrypt and insert a received document into local staging area to be + processed later on. :param doc_id: The document ID. :type doc_id: str @@ -447,11 +434,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :param idx: The index of this document in the current sync process. :type idx: int - :return: A deferred that will fire when the operation in the database - has finished. + :return: A deferred that will fire after the decrypted document has + been inserted in the sync db. :rtype: twisted.internet.defer.Deferred """ - return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx) + soledad_assert(self._crypto is not None, "need a crypto object") + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx + # decrypt asynchronously + doc = decrypt_doc_task(*args) + # callback will insert it for later processing + return self._decrypt_doc_cb(doc) def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -481,52 +476,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._runOperation( + d = self._runOperation( query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) + return d - def _delete_received_doc(self, doc_id): + def _delete_received_docs(self, doc_ids): """ - Delete a received doc after it was inserted into the local db. + Delete a list of received docs after get them inserted into the db. - :param doc_id: Document ID. - :type doc_id: str + :param doc_id: Document ID list. + :type doc_id: list :return: A deferred that will fire when the operation in the database has finished. :rtype: twisted.internet.defer.Deferred """ - query = "DELETE FROM '%s' WHERE doc_id=?" \ - % self.TABLE_NAME - return self._runOperation(query, (doc_id,)) - - def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): - """ - Dispatch an asynchronous document decrypting routine and save the - result object. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - d = threads.deferToThread(decrypt_doc_task, *args) - d.addCallback(self._decrypt_doc_cb) + placeholders = ', '.join('?' for _ in doc_ids) + query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ + % (self.TABLE_NAME, placeholders) + return self._runOperation(query, (doc_ids)) def _decrypt_doc_cb(self, result): """ @@ -547,7 +516,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) - def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + def _get_docs(self, encrypted=None, order_by='idx', order='ASC', + sequence=None): """ Get documents from the received docs table in the sync db. @@ -565,8 +535,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ "idx FROM %s" % self.TABLE_NAME - if encrypted is not None: - query += " WHERE encrypted = %d" % int(encrypted) + if encrypted or sequence: + query += " WHERE" + if encrypted: + query += " encrypted = %d" % int(encrypted) + if sequence: + query += " idx in (" + ', '.join(sequence) + ")" + query += " ORDER BY %s %s" % (order_by, order) return self._runQuery(query) @@ -579,18 +554,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): documents. :rtype: twisted.internet.defer.Deferred """ - # here, we fetch the list of decrypted documents and compare with the - # index of the last succesfully processed document. - decrypted_docs = yield self._get_docs(encrypted=False) - insertable = [] - last_idx = self._last_inserted_idx - for doc_id, rev, content, gen, trans_id, encrypted, idx in \ - decrypted_docs: - if (idx != last_idx + 1): - break - insertable.append((doc_id, rev, content, gen, trans_id, idx)) - last_idx += 1 - defer.returnValue(insertable) + # Here, check in memory what are the insertable indexes that can + # form a sequence starting from the last inserted index + sequence = [] + insertable_docs = [] + next_index = self._last_inserted_idx + 1 + while next_index in self._decrypted_docs_indexes: + sequence.append(str(next_index)) + next_index += 1 + # Then fetch all the ones ready for insertion. + if sequence: + insertable_docs = yield self._get_docs(encrypted=False, + sequence=sequence) + defer.returnValue(insertable_docs) @defer.inlineCallbacks def _process_decrypted_docs(self): @@ -603,36 +579,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ insertable = yield self._get_insertable_docs() + processed_docs_ids = [] for doc_fields in insertable: method = self._insert_decrypted_local_doc # FIXME: This is used only because SQLCipherU1DBSync is synchronous # When adbapi is used there is no need for an external thread # Without this the reactor can freeze and fail docs download yield threads.deferToThread(method, *doc_fields) - defer.returnValue(insertable) - - def _delete_processed_docs(self, inserted): - """ - Delete from the sync db documents that have been processed. - - :param inserted: List of documents inserted in the previous process - step. - :type inserted: list - - :return: A list of deferreds that will fire when each operation in the - database has finished. - :rtype: twisted.internet.defer.DeferredList - """ - deferreds = [] - for doc_id, doc_rev, _, _, _, _ in inserted: - deferreds.append( - self._delete_received_doc(doc_id)) - if not deferreds: - return defer.succeed(None) - return defer.gatherResults(deferreds) + processed_docs_ids.append(doc_fields[0]) + yield self._delete_received_docs(processed_docs_ids) def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, idx): + gen, trans_id, encrypted, idx): """ Insert the decrypted document into the local replica. @@ -693,20 +651,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): delete operations have been executed. :rtype: twisted.internet.defer.Deferred """ + if not self.running: + defer.returnValue(None) processed = self._processed_docs pending = self._docs_to_process if processed < pending: - docs = yield self._process_decrypted_docs() - yield self._delete_processed_docs(docs) - # recurse - self._delayed_call = reactor.callLater( - self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_recurse) + yield self._process_decrypted_docs() else: self._finish() def _finish(self): self._processed_docs = 0 self._last_inserted_idx = 0 - self._deferred.callback(None) + self._decrypted_docs_indexes = set() + if not self._deferred.called: + self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index fda90909..9f7a4193 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,7 +19,6 @@ import json from u1db import errors from u1db.remote import utils from twisted.internet import defer -from twisted.internet import threads from leap.soledad.common.document import SoledadDocument from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async @@ -76,7 +75,7 @@ class HTTPDocFetcher(object): last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 - number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1) + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) if ngen: new_generation = ngen @@ -138,7 +137,6 @@ class HTTPDocFetcher(object): body=str(body), content_type='application/x-soledad-sync-get') - @defer.inlineCallbacks def _insert_received_doc(self, response, idx, total): """ Insert a received document into the local replica. @@ -152,8 +150,7 @@ class HTTPDocFetcher(object): """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ - (yield threads.deferToThread(self._parse_received_doc_response, - response)) + self._parse_received_doc_response(response) if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- @@ -188,7 +185,7 @@ class HTTPDocFetcher(object): self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) - defer.returnValue((number_of_changes, new_generation, new_transaction_id)) + return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 22ddc87d..cdc7255c 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -278,7 +278,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) if self.defer_encryption: # TODO move to api? - self._sync_enc_pool.enqueue_doc_for_encryption(doc) + self._sync_enc_pool.encrypt_doc(doc) return doc_rev # diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py index 694eb7ad..6676c298 100644 --- a/common/src/leap/soledad/common/tests/test_encdecpool.py +++ b/common/src/leap/soledad/common/tests/test_encdecpool.py @@ -57,13 +57,13 @@ class TestSyncEncrypterPool(BaseSoledadTest): self.assertIsNone(doc) @inlineCallbacks - def test_enqueue_doc_for_encryption_and_get_encrypted_doc(self): + def test_encrypt_doc_and_get_it_back(self): """ Test that the pool actually encrypts a document added to the queue. """ doc = SoledadDocument( doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) - self._pool.enqueue_doc_for_encryption(doc) + self._pool.encrypt_doc(doc) # exhaustivelly attempt to get the encrypted document encrypted = None @@ -117,6 +117,16 @@ class TestSyncDecrypterPool(BaseSoledadTest): self._pool.deferred.addCallback(_assert_doc_was_inserted) return self._pool.deferred + def test_looping_control(self): + """ + Start and stop cleanly. + """ + self._pool.start(10) + self.assertTrue(self._pool.running) + self._pool.stop() + self.assertFalse(self._pool.running) + self.assertTrue(self._pool.deferred.called) + def test_insert_received_doc_many(self): """ Test that many documents added to the pool are inserted using the @@ -179,6 +189,38 @@ class TestSyncDecrypterPool(BaseSoledadTest): _assert_doc_was_decrypted_and_inserted) return self._pool.deferred + @inlineCallbacks + def test_processing_order(self): + """ + This test ensures that processing of documents only occur if there is + a sequence in place. + """ + crypto = self._soledad._crypto + docs = [] + for i in xrange(1, 10): + i = str(i) + doc = SoledadDocument( + doc_id=DOC_ID + i, rev=DOC_REV + i, + json=json.dumps(DOC_CONTENT)) + encrypted_content = json.loads(crypto.encrypt_doc(doc)) + docs.append((doc, encrypted_content)) + + # insert the encrypted document in the pool + self._pool.start(10) # pool is expecting to process 10 docs + # first three arrives, forming a sequence + for i, (doc, encrypted_content) in enumerate(docs[:3]): + gen = idx = i + 1 + yield self._pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) + # last one arrives alone, so it can't be processed + doc, encrypted_content = docs[-1] + yield self._pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) + + yield self._pool._decrypt_and_recurse() + + self.assertEqual(3, self._pool._processed_docs) + def test_insert_encrypted_received_doc_many(self, many=100): """ Test that many encrypted documents added to the pool are decrypted and @@ -241,3 +283,4 @@ class TestSyncDecrypterPool(BaseSoledadTest): decrypted_docs = yield self._pool._get_docs(encrypted=False) # check that decrypted docs staging is clean self.assertEquals([], decrypted_docs) + self._pool.stop() diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index bf6c1515..ba7edfe3 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -481,8 +481,8 @@ class EncryptedSyncTestCase( Test if Soledad can sync very large files. """ self.skipTest( - "Work in progress. For reference, see: " - "https://leap.se/code/issues/7370") + "Work in progress. For reference, see: " + "https://leap.se/code/issues/7370") length = 100 * (10 ** 6) # 100 MB return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1) |