diff options
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 220 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target.py | 5 |
2 files changed, 84 insertions, 141 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2f58d06c..c0a05d38 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -23,14 +23,12 @@ during synchronization. import multiprocessing -import threading -import time import json import logging +from twisted.internet import reactor from twisted.internet import defer from twisted.internet.threads import deferToThread -from twisted.python.failure import Failure from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert @@ -50,6 +48,8 @@ class SyncEncryptDecryptPool(object): """ Base class for encrypter/decrypter pools. """ + + # TODO implement throttling to reduce cpu usage?? WORKERS = multiprocessing.cpu_count() def __init__(self, crypto, sync_db): @@ -62,9 +62,9 @@ class SyncEncryptDecryptPool(object): :param sync_db: A database connection handle :type sync_db: pysqlcipher.dbapi2.Connection """ - self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto self._sync_db = sync_db + self._pool = multiprocessing.Pool(self.WORKERS) def close(self): """ @@ -143,8 +143,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Pool of workers that spawn subprocesses to execute the symmetric encryption of documents to be synced. """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" @@ -191,7 +189,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): except multiprocessing.Queue.Empty: pass - def _encrypt_doc(self, doc, workers=True): + def _encrypt_doc(self, doc): """ Symmetrically encrypt a document. @@ -207,19 +205,10 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): key = self._crypto.doc_passphrase(doc.doc_id) secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret - - if workers: - # encrypt asynchronously - self._pool.apply_async( - encrypt_doc_task, args, - callback=self._encrypt_doc_cb) - else: - # encrypt inline - try: - res = encrypt_doc_task(*args) - self._encrypt_doc_cb(res) - except Exception as exc: - logger.exception(exc) + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) def _encrypt_doc_cb(self, result): """ @@ -390,24 +379,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._async_results = [] self._failure = None - self._finished = threading.Event() + self._finished = False - # clear the database before starting the sync - self._empty_db = threading.Event() - d = self._empty() - d.addCallback(lambda _: self._empty_db.set()) + # XXX we want to empty the database before starting, but this is an + # asynchronous call, so we have to somehow make sure that it is + # executed before any other call to the database, without + # blocking. + self._empty() - # start the decryption loop - def _maybe_store_failure_and_finish(result): - if isinstance(result, Failure): - self._set_failure(result) - self._finished.set() - logger.debug("Finished decrypter thread.") + def _launch_decrypt_and_process(self): + d = self._decrypt_and_process_docs() + d.addErrback(lambda f: self._set_failure(f)) - self._deferred_loop = deferToThread( - self._decrypt_and_process_docs_loop) - self._deferred_loop.addBoth( - _maybe_store_failure_and_finish) + def _schedule_decrypt_and_process(self): + reactor.callLater( + self.DECRYPT_LOOP_PERIOD, + self._launch_decrypt_and_process) @property def failure(self): @@ -415,11 +402,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _set_failure(self, failure): self._failure = failure + self._finished = True - def succeeded(self): - return self._failure is None + def failed(self): + return bool(self._failure) - def set_docs_to_process(self, docs_to_process): + def start(self, docs_to_process): """ Set the number of documents we expect to process. @@ -430,6 +418,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ self._docs_to_process = docs_to_process + self._schedule_decrypt_and_process() def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -506,10 +495,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): % self.TABLE_NAME return self._runOperation(query, (doc_id,)) - def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, - workers=True): + def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): """ - Symmetrically decrypt a document and store in the sync db. + Dispatch an asynchronous document decrypting routine and save the + result object. :param doc_id: The ID for the document with contents to be encrypted. :type doc: str @@ -525,9 +514,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str :param idx: The index of this document in the current sync process. :type idx: int - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool :return: A deferred that will fire after the document hasa been decrypted and inserted in the sync db. @@ -539,35 +525,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): key = self._crypto.doc_passphrase(doc_id) secret = self._crypto.secret args = doc_id, rev, content, gen, trans_id, key, secret, idx - - if workers: - # when using multiprocessing, we need to wait for all parallel - # processing to finish before continuing with the - # decrypt-and-process loop. We do this by using an extra deferred - # that will be fired by the multiprocessing callback when it has - # finished processing. - d1 = defer.Deferred() - - def _multiprocessing_callback(result): - d2 = self._decrypt_doc_cb(result) - d2.addCallback(lambda defres: d1.callback(defres)) - - # save the async result object so we can inspect it for failures - self._async_results.append( - self._pool.apply_async( - decrypt_doc_task, args, - callback=_multiprocessing_callback)) - - return d1 - else: - # decrypt inline - res = decrypt_doc_task(*args) - return self._decrypt_doc_cb(res) + # decrypt asynchronously + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args)) def _decrypt_doc_cb(self, result): """ Store the decryption result in the sync db from where it will later be - picked by _process_decrypted. + picked by _process_decrypted_docs. :param result: A tuple containing the document's id, revision, content, generation, transaction id and sync index. @@ -636,7 +602,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx += 1 defer.returnValue(insertable) - def _decrypt_received_docs(self): + @defer.inlineCallbacks + def _async_decrypt_received_docs(self): """ Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. @@ -645,37 +612,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): decrypted and inserted back in the sync db. :rtype: twisted.internet.defer.Deferred """ + docs = yield self._get_docs(encrypted=True) + for doc_id, rev, content, gen, trans_id, _, idx in docs: + self._async_decrypt_doc( + doc_id, rev, content, gen, trans_id, idx) - def _callback(received_docs): - deferreds = [] - for doc_id, rev, content, gen, trans_id, _, idx in received_docs: - deferreds.append( - self._decrypt_doc( - doc_id, rev, content, gen, trans_id, idx)) - return defer.gatherResults(deferreds) - - d = self._get_docs(encrypted=True) - d.addCallback(_callback) - return d - - def _process_decrypted(self): + @defer.inlineCallbacks + def _process_decrypted_docs(self): """ Fetch as many decrypted documents as can be taken from the expected - order and insert them in the database. + order and insert them in the local replica. :return: A deferred that will fire with the list of inserted documents. :rtype: twisted.internet.defer.Deferred """ - - def _callback(insertable): - for doc_fields in insertable: - self._insert_decrypted_local_doc(*doc_fields) - return insertable - - d = self._get_insertable_docs() - d.addCallback(_callback) - return d + insertable = yield self._get_insertable_docs() + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + defer.returnValue(insertable) def _delete_processed_docs(self, inserted): """ @@ -700,8 +655,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, gen, trans_id, idx): """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `insert_doc_cb` passed to the caller + Insert the decrypted document into the local replica. + + Make use of the passed callback `insert_doc_cb` passed to the caller by u1db sync. :param doc_id: The document id. @@ -743,59 +699,47 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) - def _raise_if_async_fails(self): + def _collect_async_decryption_results(self): """ - Raise any exception raised by a multiprocessing async decryption - call. + Collect the results of the asynchronous doc decryptions and re-raise + any exception raised by a multiprocessing async decryption call. :raise Exception: Raised if an async call has raised an exception. """ - for res in self._async_results: + async_results = self._async_results[:] + for res in async_results: if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() + self._decrypt_doc_cb(res.get()) # might raise an exception! + self._async_results.remove(res) - def _decrypt_and_process_docs_loop(self): + @defer.inlineCallbacks + def _decrypt_and_process_docs(self): """ Decrypt the documents received from remote replica and insert them into the local one. - This method runs in its own thread, so sleeping will not interfere - with the main thread. - """ - # wait for database to be emptied - self._empty_db.wait() - - # wait until we know how many documents we need to process - while self._docs_to_process is None: - time.sleep(self.DECRYPT_LOOP_PERIOD) - - # because all database operations are asynchronous, we use an - # event to make sure we don't start the next loop before the - # current one has finished. - event = threading.Event() - - # loop until we have processes as many docs as the number of - # changes - while self._processed_docs < self._docs_to_process: + This method implicitelly returns a defferred (see the decorator + above). It should only be called by _launch_decrypt_and_process(). + because this way any exceptions raised here will be stored by the + errback attached to the deferred returned. - event.clear() - - d = self._decrypt_received_docs() - d.addCallback(lambda _: self._raise_if_async_fails()) - d.addCallback(lambda _: self._process_decrypted()) - d.addCallback(lambda r: self._delete_processed_docs(r)) - d.addErrback(self._set_failure) # grab failure and continue - d.addCallback(lambda _: event.set()) - - event.wait() - - if not self.succeeded(): - break - - # sleep a bit to give time for some decryption work - time.sleep(self.DECRYPT_LOOP_PERIOD) + :return: A deferred which will fire after all decrypt, process and + delete operations have been executed. + :rtype: twisted.internet.defer.Deferred + """ + if not self.failed(): + if self._processed_docs < self._docs_to_process: + yield self._async_decrypt_received_docs() + yield self._collect_async_decryption_results() + docs = yield self._process_decrypted_docs() + yield self._delete_processed_docs(docs) + # recurse + self._schedule_decrypt_and_process() + else: + self._finished = True def has_finished(self): - return self._finished.is_set() + """ + Return whether the decrypter has finished its work. + """ + return self._finished diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index bf563b34..75af9cf7 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -402,8 +402,7 @@ class SoledadHTTPSyncTarget(SyncTarget): number_of_changes, ngen, ntrans = yield d if defer_decryption: - self._sync_decr_pool.set_docs_to_process( - number_of_changes) + self._sync_decr_pool.start(number_of_changes) #--------------------------------------------------------------------- # maybe receive the rest of the documents @@ -459,7 +458,7 @@ class SoledadHTTPSyncTarget(SyncTarget): SyncDecrypterPool.DECRYPT_LOOP_PERIOD, _wait_or_finish) else: - if self._sync_decr_pool.succeeded(): + if not self._sync_decr_pool.failed(): d.callback(None) else: d.errback(self._sync_decr_pool.failure) |