summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/changes/next-changelog.rst1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py183
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py9
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py2
-rw-r--r--common/src/leap/soledad/common/tests/test_encdecpool.py47
-rw-r--r--common/src/leap/soledad/common/tests/test_server.py4
6 files changed, 122 insertions, 124 deletions
diff --git a/client/changes/next-changelog.rst b/client/changes/next-changelog.rst
index a696fe10..c676625f 100644
--- a/client/changes/next-changelog.rst
+++ b/client/changes/next-changelog.rst
@@ -26,6 +26,7 @@ Misc
- Refactor bootstrap to remove shared db lock.
- `#1236 <https://leap.se/code/issues/1236>`_: Description of the new feature corresponding with issue #1236.
- Some change without issue number.
+- Removed multiprocessing from encdecpool with some extra refactoring.
Known Issues
~~~~~~~~~~~~
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 576b8b2c..218ebfa9 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -25,7 +25,7 @@ during synchronization.
import json
import logging
-from twisted.internet import reactor
+from twisted.internet.task import LoopingCall
from twisted.internet import threads
from twisted.internet import defer
from twisted.python import log
@@ -167,26 +167,14 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
SyncEncryptDecryptPool.stop(self)
- def enqueue_doc_for_encryption(self, doc):
+ def encrypt_doc(self, doc):
"""
- Enqueue a document for encryption.
+ Encrypt document asynchronously then insert it on
+ local staging database.
:param doc: The document to be encrypted.
:type doc: SoledadDocument
"""
- self._encrypt_doc(doc)
-
- def _encrypt_doc(self, doc):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
soledad_assert(self._crypto is not None, "need a crypto object")
docstr = doc.get_json()
key = self._crypto.doc_passphrase(doc.doc_id)
@@ -276,8 +264,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
:type doc_id: str
:param doc_rev: The document revision.
:type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
+ :param content: The encrypted content of the document as JSON dict.
+ :type content: dict
:param gen: The generation corresponding to the modification of that
document.
:type gen: int
@@ -294,7 +282,6 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
:return: A tuple containing the doc id, revision and encrypted content.
:rtype: tuple(str, str, str)
"""
- content = json.loads(content) if type(content) == str else content
decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
@@ -356,14 +343,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._processed_docs = 0
self._last_inserted_idx = 0
- # a list that holds the asynchronous decryption results so they can be
- # collected when they are ready
- self._async_results = []
-
# initialize db and make sure any database operation happens after
# db initialization
self._deferred_init = self._init_db()
self._wait_init_db('_runOperation', '_runQuery')
+ self._loop = LoopingCall(self._decrypt_and_recurse)
+ self._decrypted_docs_indexes = set()
def _wait_init_db(self, *methods):
"""
@@ -408,11 +393,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
SyncEncryptDecryptPool.start(self)
self._docs_to_process = docs_to_process
self._deferred = defer.Deferred()
- reactor.callWhenRunning(self._launch_decrypt_and_recurse)
+ self._loop.start(self.DECRYPT_LOOP_PERIOD)
- def _launch_decrypt_and_recurse(self):
- d = self._decrypt_and_recurse()
- d.addErrback(self._errback)
+ def stop(self):
+ if self._loop.running:
+ self._loop.stop()
+ self._finish()
+ SyncEncryptDecryptPool.stop(self)
def _errback(self, failure):
log.err(failure)
@@ -431,8 +418,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def insert_encrypted_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
"""
- Insert a received message with encrypted content, to be decrypted later
- on.
+ Decrypt and insert a received document into local staging area to be
+ processed later on.
:param doc_id: The document ID.
:type doc_id: str
@@ -447,11 +434,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:param idx: The index of this document in the current sync process.
:type idx: int
- :return: A deferred that will fire when the operation in the database
- has finished.
+ :return: A deferred that will fire after the decrypted document has
+ been inserted in the sync db.
:rtype: twisted.internet.defer.Deferred
"""
- return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx)
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx
+ # decrypt asynchronously
+ doc = decrypt_doc_task(*args)
+ # callback will insert it for later processing
+ return self._decrypt_doc_cb(doc)
def insert_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
@@ -481,52 +476,26 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
content = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._runOperation(
+ d = self._runOperation(
query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+ d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))
+ return d
- def _delete_received_doc(self, doc_id):
+ def _delete_received_docs(self, doc_ids):
"""
- Delete a received doc after it was inserted into the local db.
+ Delete a list of received docs after get them inserted into the db.
- :param doc_id: Document ID.
- :type doc_id: str
+ :param doc_id: Document ID list.
+ :type doc_id: list
:return: A deferred that will fire when the operation in the database
has finished.
:rtype: twisted.internet.defer.Deferred
"""
- query = "DELETE FROM '%s' WHERE doc_id=?" \
- % self.TABLE_NAME
- return self._runOperation(query, (doc_id,))
-
- def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):
- """
- Dispatch an asynchronous document decrypting routine and save the
- result object.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret, idx
- # decrypt asynchronously
- d = threads.deferToThread(decrypt_doc_task, *args)
- d.addCallback(self._decrypt_doc_cb)
+ placeholders = ', '.join('?' for _ in doc_ids)
+ query = "DELETE FROM '%s' WHERE doc_id in (%s)" \
+ % (self.TABLE_NAME, placeholders)
+ return self._runOperation(query, (doc_ids))
def _decrypt_doc_cb(self, result):
"""
@@ -547,7 +516,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return self.insert_received_doc(
doc_id, rev, content, gen, trans_id, idx)
- def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ def _get_docs(self, encrypted=None, order_by='idx', order='ASC',
+ sequence=None):
"""
Get documents from the received docs table in the sync db.
@@ -565,8 +535,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
"idx FROM %s" % self.TABLE_NAME
- if encrypted is not None:
- query += " WHERE encrypted = %d" % int(encrypted)
+ if encrypted or sequence:
+ query += " WHERE"
+ if encrypted:
+ query += " encrypted = %d" % int(encrypted)
+ if sequence:
+ query += " idx in (" + ', '.join(sequence) + ")"
+
query += " ORDER BY %s %s" % (order_by, order)
return self._runQuery(query)
@@ -579,18 +554,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
documents.
:rtype: twisted.internet.defer.Deferred
"""
- # here, we fetch the list of decrypted documents and compare with the
- # index of the last succesfully processed document.
- decrypted_docs = yield self._get_docs(encrypted=False)
- insertable = []
- last_idx = self._last_inserted_idx
- for doc_id, rev, content, gen, trans_id, encrypted, idx in \
- decrypted_docs:
- if (idx != last_idx + 1):
- break
- insertable.append((doc_id, rev, content, gen, trans_id, idx))
- last_idx += 1
- defer.returnValue(insertable)
+ # Here, check in memory what are the insertable indexes that can
+ # form a sequence starting from the last inserted index
+ sequence = []
+ insertable_docs = []
+ next_index = self._last_inserted_idx + 1
+ while next_index in self._decrypted_docs_indexes:
+ sequence.append(str(next_index))
+ next_index += 1
+ # Then fetch all the ones ready for insertion.
+ if sequence:
+ insertable_docs = yield self._get_docs(encrypted=False,
+ sequence=sequence)
+ defer.returnValue(insertable_docs)
@defer.inlineCallbacks
def _process_decrypted_docs(self):
@@ -603,36 +579,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
insertable = yield self._get_insertable_docs()
+ processed_docs_ids = []
for doc_fields in insertable:
method = self._insert_decrypted_local_doc
# FIXME: This is used only because SQLCipherU1DBSync is synchronous
# When adbapi is used there is no need for an external thread
# Without this the reactor can freeze and fail docs download
yield threads.deferToThread(method, *doc_fields)
- defer.returnValue(insertable)
-
- def _delete_processed_docs(self, inserted):
- """
- Delete from the sync db documents that have been processed.
-
- :param inserted: List of documents inserted in the previous process
- step.
- :type inserted: list
-
- :return: A list of deferreds that will fire when each operation in the
- database has finished.
- :rtype: twisted.internet.defer.DeferredList
- """
- deferreds = []
- for doc_id, doc_rev, _, _, _, _ in inserted:
- deferreds.append(
- self._delete_received_doc(doc_id))
- if not deferreds:
- return defer.succeed(None)
- return defer.gatherResults(deferreds)
+ processed_docs_ids.append(doc_fields[0])
+ yield self._delete_received_docs(processed_docs_ids)
def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id, idx):
+ gen, trans_id, encrypted, idx):
"""
Insert the decrypted document into the local replica.
@@ -693,20 +651,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
delete operations have been executed.
:rtype: twisted.internet.defer.Deferred
"""
+ if not self.running:
+ defer.returnValue(None)
processed = self._processed_docs
pending = self._docs_to_process
if processed < pending:
- docs = yield self._process_decrypted_docs()
- yield self._delete_processed_docs(docs)
- # recurse
- self._delayed_call = reactor.callLater(
- self.DECRYPT_LOOP_PERIOD,
- self._launch_decrypt_and_recurse)
+ yield self._process_decrypted_docs()
else:
self._finish()
def _finish(self):
self._processed_docs = 0
self._last_inserted_idx = 0
- self._deferred.callback(None)
+ self._decrypted_docs_indexes = set()
+ if not self._deferred.called:
+ self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index fda90909..9f7a4193 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -19,7 +19,6 @@ import json
from u1db import errors
from u1db.remote import utils
from twisted.internet import defer
-from twisted.internet import threads
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
@@ -76,7 +75,7 @@ class HTTPDocFetcher(object):
last_known_generation, last_known_trans_id,
sync_id, 0)
self._received_docs = 0
- number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1)
+ number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
if ngen:
new_generation = ngen
@@ -138,7 +137,6 @@ class HTTPDocFetcher(object):
body=str(body),
content_type='application/x-soledad-sync-get')
- @defer.inlineCallbacks
def _insert_received_doc(self, response, idx, total):
"""
Insert a received document into the local replica.
@@ -152,8 +150,7 @@ class HTTPDocFetcher(object):
"""
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \
- (yield threads.deferToThread(self._parse_received_doc_response,
- response))
+ self._parse_received_doc_response(response)
if doc_id is not None:
# decrypt incoming document and insert into local database
# -------------------------------------------------------------
@@ -188,7 +185,7 @@ class HTTPDocFetcher(object):
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total)
- defer.returnValue((number_of_changes, new_generation, new_transaction_id))
+ return number_of_changes, new_generation, new_transaction_id
def _parse_received_doc_response(self, response):
"""
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 22ddc87d..cdc7255c 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -278,7 +278,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
if self.defer_encryption:
# TODO move to api?
- self._sync_enc_pool.enqueue_doc_for_encryption(doc)
+ self._sync_enc_pool.encrypt_doc(doc)
return doc_rev
#
diff --git a/common/src/leap/soledad/common/tests/test_encdecpool.py b/common/src/leap/soledad/common/tests/test_encdecpool.py
index 694eb7ad..6676c298 100644
--- a/common/src/leap/soledad/common/tests/test_encdecpool.py
+++ b/common/src/leap/soledad/common/tests/test_encdecpool.py
@@ -57,13 +57,13 @@ class TestSyncEncrypterPool(BaseSoledadTest):
self.assertIsNone(doc)
@inlineCallbacks
- def test_enqueue_doc_for_encryption_and_get_encrypted_doc(self):
+ def test_encrypt_doc_and_get_it_back(self):
"""
Test that the pool actually encrypts a document added to the queue.
"""
doc = SoledadDocument(
doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
- self._pool.enqueue_doc_for_encryption(doc)
+ self._pool.encrypt_doc(doc)
# exhaustivelly attempt to get the encrypted document
encrypted = None
@@ -117,6 +117,16 @@ class TestSyncDecrypterPool(BaseSoledadTest):
self._pool.deferred.addCallback(_assert_doc_was_inserted)
return self._pool.deferred
+ def test_looping_control(self):
+ """
+ Start and stop cleanly.
+ """
+ self._pool.start(10)
+ self.assertTrue(self._pool.running)
+ self._pool.stop()
+ self.assertFalse(self._pool.running)
+ self.assertTrue(self._pool.deferred.called)
+
def test_insert_received_doc_many(self):
"""
Test that many documents added to the pool are inserted using the
@@ -179,6 +189,38 @@ class TestSyncDecrypterPool(BaseSoledadTest):
_assert_doc_was_decrypted_and_inserted)
return self._pool.deferred
+ @inlineCallbacks
+ def test_processing_order(self):
+ """
+ This test ensures that processing of documents only occur if there is
+ a sequence in place.
+ """
+ crypto = self._soledad._crypto
+ docs = []
+ for i in xrange(1, 10):
+ i = str(i)
+ doc = SoledadDocument(
+ doc_id=DOC_ID + i, rev=DOC_REV + i,
+ json=json.dumps(DOC_CONTENT))
+ encrypted_content = json.loads(crypto.encrypt_doc(doc))
+ docs.append((doc, encrypted_content))
+
+ # insert the encrypted document in the pool
+ self._pool.start(10) # pool is expecting to process 10 docs
+ # first three arrives, forming a sequence
+ for i, (doc, encrypted_content) in enumerate(docs[:3]):
+ gen = idx = i + 1
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx)
+ # last one arrives alone, so it can't be processed
+ doc, encrypted_content = docs[-1]
+ yield self._pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10)
+
+ yield self._pool._decrypt_and_recurse()
+
+ self.assertEqual(3, self._pool._processed_docs)
+
def test_insert_encrypted_received_doc_many(self, many=100):
"""
Test that many encrypted documents added to the pool are decrypted and
@@ -241,3 +283,4 @@ class TestSyncDecrypterPool(BaseSoledadTest):
decrypted_docs = yield self._pool._get_docs(encrypted=False)
# check that decrypted docs staging is clean
self.assertEquals([], decrypted_docs)
+ self._pool.stop()
diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py
index bf6c1515..ba7edfe3 100644
--- a/common/src/leap/soledad/common/tests/test_server.py
+++ b/common/src/leap/soledad/common/tests/test_server.py
@@ -481,8 +481,8 @@ class EncryptedSyncTestCase(
Test if Soledad can sync very large files.
"""
self.skipTest(
- "Work in progress. For reference, see: "
- "https://leap.se/code/issues/7370")
+ "Work in progress. For reference, see: "
+ "https://leap.se/code/issues/7370")
length = 100 * (10 ** 6) # 100 MB
return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1)