diff options
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 220 | 
1 files changed, 82 insertions, 138 deletions
| diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2f58d06c..c0a05d38 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -23,14 +23,12 @@ during synchronization.  import multiprocessing -import threading -import time  import json  import logging +from twisted.internet import reactor  from twisted.internet import defer  from twisted.internet.threads import deferToThread -from twisted.python.failure import Failure  from leap.soledad.common.document import SoledadDocument  from leap.soledad.common import soledad_assert @@ -50,6 +48,8 @@ class SyncEncryptDecryptPool(object):      """      Base class for encrypter/decrypter pools.      """ + +    # TODO implement throttling to reduce cpu usage??      WORKERS = multiprocessing.cpu_count()      def __init__(self, crypto, sync_db): @@ -62,9 +62,9 @@ class SyncEncryptDecryptPool(object):          :param sync_db: A database connection handle          :type sync_db: pysqlcipher.dbapi2.Connection          """ -        self._pool = multiprocessing.Pool(self.WORKERS)          self._crypto = crypto          self._sync_db = sync_db +        self._pool = multiprocessing.Pool(self.WORKERS)      def close(self):          """ @@ -143,8 +143,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):      Pool of workers that spawn subprocesses to execute the symmetric encryption      of documents to be synced.      """ -    # TODO implement throttling to reduce cpu usage?? -    WORKERS = multiprocessing.cpu_count()      TABLE_NAME = "docs_tosync"      FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" @@ -191,7 +189,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):              except multiprocessing.Queue.Empty:                  pass -    def _encrypt_doc(self, doc, workers=True): +    def _encrypt_doc(self, doc):          """          Symmetrically encrypt a document. @@ -207,19 +205,10 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          key = self._crypto.doc_passphrase(doc.doc_id)          secret = self._crypto.secret          args = doc.doc_id, doc.rev, docstr, key, secret - -        if workers: -            # encrypt asynchronously -            self._pool.apply_async( -                encrypt_doc_task, args, -                callback=self._encrypt_doc_cb) -        else: -            # encrypt inline -            try: -                res = encrypt_doc_task(*args) -                self._encrypt_doc_cb(res) -            except Exception as exc: -                logger.exception(exc) +        # encrypt asynchronously +        self._pool.apply_async( +            encrypt_doc_task, args, +            callback=self._encrypt_doc_cb)      def _encrypt_doc_cb(self, result):          """ @@ -390,24 +379,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._async_results = []          self._failure = None -        self._finished = threading.Event() +        self._finished = False -        # clear the database before starting the sync -        self._empty_db = threading.Event() -        d = self._empty() -        d.addCallback(lambda _: self._empty_db.set()) +        # XXX we want to empty the database before starting, but this is an +        #     asynchronous call, so we have to somehow make sure that it is +        #     executed before any other call to the database, without +        #     blocking. +        self._empty() -        # start the decryption loop -        def _maybe_store_failure_and_finish(result): -            if isinstance(result, Failure): -                self._set_failure(result) -            self._finished.set() -            logger.debug("Finished decrypter thread.") +    def _launch_decrypt_and_process(self): +        d = self._decrypt_and_process_docs() +        d.addErrback(lambda f: self._set_failure(f)) -        self._deferred_loop = deferToThread( -            self._decrypt_and_process_docs_loop) -        self._deferred_loop.addBoth( -            _maybe_store_failure_and_finish) +    def _schedule_decrypt_and_process(self): +        reactor.callLater( +            self.DECRYPT_LOOP_PERIOD, +            self._launch_decrypt_and_process)      @property      def failure(self): @@ -415,11 +402,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      def _set_failure(self, failure):          self._failure = failure +        self._finished = True -    def succeeded(self): -        return self._failure is None +    def failed(self): +        return bool(self._failure) -    def set_docs_to_process(self, docs_to_process): +    def start(self, docs_to_process):          """          Set the number of documents we expect to process. @@ -430,6 +418,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type docs_to_process: int          """          self._docs_to_process = docs_to_process +        self._schedule_decrypt_and_process()      def insert_encrypted_received_doc(              self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -506,10 +495,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                  % self.TABLE_NAME          return self._runOperation(query, (doc_id,)) -    def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, -                     workers=True): +    def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):          """ -        Symmetrically decrypt a document and store in the sync db. +        Dispatch an asynchronous document decrypting routine and save the +        result object.          :param doc_id: The ID for the document with contents to be encrypted.          :type doc: str @@ -525,9 +514,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          :type trans_id: str          :param idx: The index of this document in the current sync process.          :type idx: int -        :param workers: Whether to defer the decryption to the multiprocess -                        pool of workers. Useful for debugging purposes. -        :type workers: bool          :return: A deferred that will fire after the document hasa been                   decrypted and inserted in the sync db. @@ -539,35 +525,15 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          key = self._crypto.doc_passphrase(doc_id)          secret = self._crypto.secret          args = doc_id, rev, content, gen, trans_id, key, secret, idx - -        if workers: -            # when using multiprocessing, we need to wait for all parallel -            # processing to finish before continuing with the -            # decrypt-and-process loop. We do this by using an extra deferred -            # that will be fired by the multiprocessing callback when it has -            # finished processing. -            d1 = defer.Deferred() - -            def _multiprocessing_callback(result): -                d2 = self._decrypt_doc_cb(result) -                d2.addCallback(lambda defres: d1.callback(defres)) - -            # save the async result object so we can inspect it for failures -            self._async_results.append( -                self._pool.apply_async( -                    decrypt_doc_task, args, -                    callback=_multiprocessing_callback)) - -            return d1 -        else: -            # decrypt inline -            res = decrypt_doc_task(*args) -            return self._decrypt_doc_cb(res) +        # decrypt asynchronously +        self._async_results.append( +            self._pool.apply_async( +                decrypt_doc_task, args))      def _decrypt_doc_cb(self, result):          """          Store the decryption result in the sync db from where it will later be -        picked by _process_decrypted. +        picked by _process_decrypted_docs.          :param result: A tuple containing the document's id, revision,                         content, generation, transaction id and sync index. @@ -636,7 +602,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              last_idx += 1          defer.returnValue(insertable) -    def _decrypt_received_docs(self): +    @defer.inlineCallbacks +    def _async_decrypt_received_docs(self):          """          Get all the encrypted documents from the sync database and dispatch a          decrypt worker to decrypt each one of them. @@ -645,37 +612,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                   decrypted and inserted back in the sync db.          :rtype: twisted.internet.defer.Deferred          """ +        docs = yield self._get_docs(encrypted=True) +        for doc_id, rev, content, gen, trans_id, _, idx in docs: +            self._async_decrypt_doc( +                doc_id, rev, content, gen, trans_id, idx) -        def _callback(received_docs): -            deferreds = [] -            for doc_id, rev, content, gen, trans_id, _, idx in received_docs: -                deferreds.append( -                    self._decrypt_doc( -                        doc_id, rev, content, gen, trans_id, idx)) -            return defer.gatherResults(deferreds) - -        d = self._get_docs(encrypted=True) -        d.addCallback(_callback) -        return d - -    def _process_decrypted(self): +    @defer.inlineCallbacks +    def _process_decrypted_docs(self):          """          Fetch as many decrypted documents as can be taken from the expected -        order and insert them in the database. +        order and insert them in the local replica.          :return: A deferred that will fire with the list of inserted                   documents.          :rtype: twisted.internet.defer.Deferred          """ - -        def _callback(insertable): -            for doc_fields in insertable: -                self._insert_decrypted_local_doc(*doc_fields) -            return insertable - -        d = self._get_insertable_docs() -        d.addCallback(_callback) -        return d +        insertable = yield self._get_insertable_docs() +        for doc_fields in insertable: +            self._insert_decrypted_local_doc(*doc_fields) +        defer.returnValue(insertable)      def _delete_processed_docs(self, inserted):          """ @@ -700,8 +655,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):      def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,                                      gen, trans_id, idx):          """ -        Insert the decrypted document into the local sqlcipher database. -        Makes use of the passed callback `insert_doc_cb` passed to the caller +        Insert the decrypted document into the local replica. + +        Make use of the passed callback `insert_doc_cb` passed to the caller          by u1db sync.          :param doc_id: The document id. @@ -743,59 +699,47 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)          return self._runOperation(query) -    def _raise_if_async_fails(self): +    def _collect_async_decryption_results(self):          """ -        Raise any exception raised by a multiprocessing async decryption -        call. +        Collect the results of the asynchronous doc decryptions and re-raise +        any exception raised by a multiprocessing async decryption call.          :raise Exception: Raised if an async call has raised an exception.          """ -        for res in self._async_results: +        async_results = self._async_results[:] +        for res in async_results:              if res.ready(): -                if not res.successful(): -                    # re-raise the exception raised by the remote call -                    res.get() +                self._decrypt_doc_cb(res.get())  # might raise an exception! +                self._async_results.remove(res) -    def _decrypt_and_process_docs_loop(self): +    @defer.inlineCallbacks +    def _decrypt_and_process_docs(self):          """          Decrypt the documents received from remote replica and insert them          into the local one. -        This method runs in its own thread, so sleeping will not interfere -        with the main thread. -        """ -        # wait for database to be emptied -        self._empty_db.wait() - -        # wait until we know how many documents we need to process -        while self._docs_to_process is None: -            time.sleep(self.DECRYPT_LOOP_PERIOD) - -        # because all database operations are asynchronous, we use an -        # event to make sure we don't start the next loop before the -        # current one has finished. -        event = threading.Event() - -        # loop until we have processes as many docs as the number of -        # changes -        while self._processed_docs < self._docs_to_process: +        This method implicitelly returns a defferred (see the decorator +        above). It should only be called by _launch_decrypt_and_process(). +        because this way any exceptions raised here will be stored by the +        errback attached to the deferred returned. -            event.clear() - -            d = self._decrypt_received_docs() -            d.addCallback(lambda _: self._raise_if_async_fails()) -            d.addCallback(lambda _: self._process_decrypted()) -            d.addCallback(lambda r: self._delete_processed_docs(r)) -            d.addErrback(self._set_failure)  # grab failure and continue -            d.addCallback(lambda _: event.set()) - -            event.wait() - -            if not self.succeeded(): -                break - -            # sleep a bit to give time for some decryption work -            time.sleep(self.DECRYPT_LOOP_PERIOD) +        :return: A deferred which will fire after all decrypt, process and +                 delete operations have been executed. +        :rtype: twisted.internet.defer.Deferred +        """ +        if not self.failed(): +            if self._processed_docs < self._docs_to_process: +                yield self._async_decrypt_received_docs() +                yield self._collect_async_decryption_results() +                docs = yield self._process_decrypted_docs() +                yield self._delete_processed_docs(docs) +                # recurse +                self._schedule_decrypt_and_process() +            else: +                self._finished = True      def has_finished(self): -        return self._finished.is_set() +        """ +        Return whether the decrypter has finished its work. +        """ +        return self._finished | 
