summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/encdecpool.py220
-rw-r--r--client/src/leap/soledad/client/http_target.py5
2 files changed, 84 insertions, 141 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 2f58d06c..c0a05d38 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -23,14 +23,12 @@ during synchronization.
import multiprocessing
-import threading
-import time
import json
import logging
+from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet.threads import deferToThread
-from twisted.python.failure import Failure
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
@@ -50,6 +48,8 @@ class SyncEncryptDecryptPool(object):
"""
Base class for encrypter/decrypter pools.
"""
+
+ # TODO implement throttling to reduce cpu usage??
WORKERS = multiprocessing.cpu_count()
def __init__(self, crypto, sync_db):
@@ -62,9 +62,9 @@ class SyncEncryptDecryptPool(object):
:param sync_db: A database connection handle
:type sync_db: pysqlcipher.dbapi2.Connection
"""
- self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
self._sync_db = sync_db
+ self._pool = multiprocessing.Pool(self.WORKERS)
def close(self):
"""
@@ -143,8 +143,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Pool of workers that spawn subprocesses to execute the symmetric encryption
of documents to be synced.
"""
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
@@ -191,7 +189,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
except multiprocessing.Queue.Empty:
pass
- def _encrypt_doc(self, doc, workers=True):
+ def _encrypt_doc(self, doc):
"""
Symmetrically encrypt a document.
@@ -207,19 +205,10 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
key = self._crypto.doc_passphrase(doc.doc_id)
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
-
- if workers:
- # encrypt asynchronously
- self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self._encrypt_doc_cb)
- else:
- # encrypt inline
- try:
- res = encrypt_doc_task(*args)
- self._encrypt_doc_cb(res)
- except Exception as exc:
- logger.exception(exc)
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
def _encrypt_doc_cb(self, result):
"""
@@ -390,24 +379,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._async_results = []
self._failure = None
- self._finished = threading.Event()
+ self._finished = False
- # clear the database before starting the sync
- self._empty_db = threading.Event()
- d = self._empty()
- d.addCallback(lambda _: self._empty_db.set())
+ # XXX we want to empty the database before starting, but this is an
+ # asynchronous call, so we have to somehow make sure that it is
+ # executed before any other call to the database, without
+ # blocking.
+ self._empty()
- # start the decryption loop
- def _maybe_store_failure_and_finish(result):
- if isinstance(result, Failure):
- self._set_failure(result)
- self._finished.set()
- logger.debug("Finished decrypter thread.")
+ def _launch_decrypt_and_process(self):
+ d = self._decrypt_and_process_docs()
+ d.addErrback(lambda f: self._set_failure(f))
- self._deferred_loop = deferToThread(
- self._decrypt_and_process_docs_loop)
- self._deferred_loop.addBoth(
- _maybe_store_failure_and_finish)
+ def _schedule_decrypt_and_process(self):
+ reactor.callLater(
+ self.DECRYPT_LOOP_PERIOD,
+ self._launch_decrypt_and_process)
@property
def failure(self):
@@ -415,11 +402,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def _set_failure(self, failure):
self._failure = failure
+ self._finished = True
- def succeeded(self):
- return self._failure is None
+ def failed(self):
+ return bool(self._failure)
- def set_docs_to_process(self, docs_to_process):
+ def start(self, docs_to_process):
"""
Set the number of documents we expect to process.
@@ -430,6 +418,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
self._docs_to_process = docs_to_process
+ self._schedule_decrypt_and_process()
def insert_encrypted_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
@@ -506,10 +495,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
% self.TABLE_NAME
return self._runOperation(query, (doc_id,))
- def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
- workers=True):
+ def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):
"""
- Symmetrically decrypt a document and store in the sync db.
+ Dispatch an asynchronous document decrypting routine and save the
+ result object.
:param doc_id: The ID for the document with contents to be encrypted.
:type doc: str
@@ -525,9 +514,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
:param idx: The index of this document in the current sync process.
:type idx: int
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
:return: A deferred that will fire after the document hasa been
decrypted and inserted in the sync db.
@@ -539,35 +525,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
key = self._crypto.doc_passphrase(doc_id)
secret = self._crypto.secret
args = doc_id, rev, content, gen, trans_id, key, secret, idx
-
- if workers:
- # when using multiprocessing, we need to wait for all parallel
- # processing to finish before continuing with the
- # decrypt-and-process loop. We do this by using an extra deferred
- # that will be fired by the multiprocessing callback when it has
- # finished processing.
- d1 = defer.Deferred()
-
- def _multiprocessing_callback(result):
- d2 = self._decrypt_doc_cb(result)
- d2.addCallback(lambda defres: d1.callback(defres))
-
- # save the async result object so we can inspect it for failures
- self._async_results.append(
- self._pool.apply_async(
- decrypt_doc_task, args,
- callback=_multiprocessing_callback))
-
- return d1
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- return self._decrypt_doc_cb(res)
+ # decrypt asynchronously
+ self._async_results.append(
+ self._pool.apply_async(
+ decrypt_doc_task, args))
def _decrypt_doc_cb(self, result):
"""
Store the decryption result in the sync db from where it will later be
- picked by _process_decrypted.
+ picked by _process_decrypted_docs.
:param result: A tuple containing the document's id, revision,
content, generation, transaction id and sync index.
@@ -636,7 +602,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
last_idx += 1
defer.returnValue(insertable)
- def _decrypt_received_docs(self):
+ @defer.inlineCallbacks
+ def _async_decrypt_received_docs(self):
"""
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
@@ -645,37 +612,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
decrypted and inserted back in the sync db.
:rtype: twisted.internet.defer.Deferred
"""
+ docs = yield self._get_docs(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _, idx in docs:
+ self._async_decrypt_doc(
+ doc_id, rev, content, gen, trans_id, idx)
- def _callback(received_docs):
- deferreds = []
- for doc_id, rev, content, gen, trans_id, _, idx in received_docs:
- deferreds.append(
- self._decrypt_doc(
- doc_id, rev, content, gen, trans_id, idx))
- return defer.gatherResults(deferreds)
-
- d = self._get_docs(encrypted=True)
- d.addCallback(_callback)
- return d
-
- def _process_decrypted(self):
+ @defer.inlineCallbacks
+ def _process_decrypted_docs(self):
"""
Fetch as many decrypted documents as can be taken from the expected
- order and insert them in the database.
+ order and insert them in the local replica.
:return: A deferred that will fire with the list of inserted
documents.
:rtype: twisted.internet.defer.Deferred
"""
-
- def _callback(insertable):
- for doc_fields in insertable:
- self._insert_decrypted_local_doc(*doc_fields)
- return insertable
-
- d = self._get_insertable_docs()
- d.addCallback(_callback)
- return d
+ insertable = yield self._get_insertable_docs()
+ for doc_fields in insertable:
+ self._insert_decrypted_local_doc(*doc_fields)
+ defer.returnValue(insertable)
def _delete_processed_docs(self, inserted):
"""
@@ -700,8 +655,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
gen, trans_id, idx):
"""
- Insert the decrypted document into the local sqlcipher database.
- Makes use of the passed callback `insert_doc_cb` passed to the caller
+ Insert the decrypted document into the local replica.
+
+ Make use of the passed callback `insert_doc_cb` passed to the caller
by u1db sync.
:param doc_id: The document id.
@@ -743,59 +699,47 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
return self._runOperation(query)
- def _raise_if_async_fails(self):
+ def _collect_async_decryption_results(self):
"""
- Raise any exception raised by a multiprocessing async decryption
- call.
+ Collect the results of the asynchronous doc decryptions and re-raise
+ any exception raised by a multiprocessing async decryption call.
:raise Exception: Raised if an async call has raised an exception.
"""
- for res in self._async_results:
+ async_results = self._async_results[:]
+ for res in async_results:
if res.ready():
- if not res.successful():
- # re-raise the exception raised by the remote call
- res.get()
+ self._decrypt_doc_cb(res.get()) # might raise an exception!
+ self._async_results.remove(res)
- def _decrypt_and_process_docs_loop(self):
+ @defer.inlineCallbacks
+ def _decrypt_and_process_docs(self):
"""
Decrypt the documents received from remote replica and insert them
into the local one.
- This method runs in its own thread, so sleeping will not interfere
- with the main thread.
- """
- # wait for database to be emptied
- self._empty_db.wait()
-
- # wait until we know how many documents we need to process
- while self._docs_to_process is None:
- time.sleep(self.DECRYPT_LOOP_PERIOD)
-
- # because all database operations are asynchronous, we use an
- # event to make sure we don't start the next loop before the
- # current one has finished.
- event = threading.Event()
-
- # loop until we have processes as many docs as the number of
- # changes
- while self._processed_docs < self._docs_to_process:
+ This method implicitelly returns a defferred (see the decorator
+ above). It should only be called by _launch_decrypt_and_process().
+ because this way any exceptions raised here will be stored by the
+ errback attached to the deferred returned.
- event.clear()
-
- d = self._decrypt_received_docs()
- d.addCallback(lambda _: self._raise_if_async_fails())
- d.addCallback(lambda _: self._process_decrypted())
- d.addCallback(lambda r: self._delete_processed_docs(r))
- d.addErrback(self._set_failure) # grab failure and continue
- d.addCallback(lambda _: event.set())
-
- event.wait()
-
- if not self.succeeded():
- break
-
- # sleep a bit to give time for some decryption work
- time.sleep(self.DECRYPT_LOOP_PERIOD)
+ :return: A deferred which will fire after all decrypt, process and
+ delete operations have been executed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not self.failed():
+ if self._processed_docs < self._docs_to_process:
+ yield self._async_decrypt_received_docs()
+ yield self._collect_async_decryption_results()
+ docs = yield self._process_decrypted_docs()
+ yield self._delete_processed_docs(docs)
+ # recurse
+ self._schedule_decrypt_and_process()
+ else:
+ self._finished = True
def has_finished(self):
- return self._finished.is_set()
+ """
+ Return whether the decrypter has finished its work.
+ """
+ return self._finished
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
index bf563b34..75af9cf7 100644
--- a/client/src/leap/soledad/client/http_target.py
+++ b/client/src/leap/soledad/client/http_target.py
@@ -402,8 +402,7 @@ class SoledadHTTPSyncTarget(SyncTarget):
number_of_changes, ngen, ntrans = yield d
if defer_decryption:
- self._sync_decr_pool.set_docs_to_process(
- number_of_changes)
+ self._sync_decr_pool.start(number_of_changes)
#---------------------------------------------------------------------
# maybe receive the rest of the documents
@@ -459,7 +458,7 @@ class SoledadHTTPSyncTarget(SyncTarget):
SyncDecrypterPool.DECRYPT_LOOP_PERIOD,
_wait_or_finish)
else:
- if self._sync_decr_pool.succeeded():
+ if not self._sync_decr_pool.failed():
d.callback(None)
else:
d.errback(self._sync_decr_pool.failure)