diff options
| -rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 117 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 | 
2 files changed, 14 insertions, 112 deletions
| diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 34667a1e..576b8b2c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,8 +22,6 @@ during synchronization.  """ -import multiprocessing -import Queue  import json  import logging @@ -51,9 +49,6 @@ class SyncEncryptDecryptPool(object):      Base class for encrypter/decrypter pools.      """ -    # TODO implement throttling to reduce cpu usage?? -    WORKERS = multiprocessing.cpu_count() -      def __init__(self, crypto, sync_db):          """          Initialize the pool of encryption-workers. @@ -66,21 +61,18 @@ class SyncEncryptDecryptPool(object):          """          self._crypto = crypto          self._sync_db = sync_db -        self._pool = None          self._delayed_call = None          self._started = False      def start(self):          if self.running:              return -        self._create_pool()          self._started = True      def stop(self):          if not self.running:              return          self._started = False -        self._destroy_pool()          # maybe cancel the next delayed call          if self._delayed_call \                  and not self._delayed_call.called: @@ -90,27 +82,6 @@ class SyncEncryptDecryptPool(object):      def running(self):          return self._started -    def _create_pool(self): -        self._pool = multiprocessing.Pool(self.WORKERS) - -    def _destroy_pool(self): -        """ -        Cleanly close the pool of workers. -        """ -        logger.debug("Closing %s" % (self.__class__.__name__,)) -        self._pool.close() -        try: -            self._pool.join() -        except Exception: -            pass - -    def terminate(self): -        """ -        Terminate the pool of workers. -        """ -        logger.debug("Terminating %s" % (self.__class__.__name__,)) -        self._pool.terminate() -      def _runOperation(self, query, *args):          """          Run an operation on the sync db. @@ -180,7 +151,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          Initialize the sync encrypter pool.          """          SyncEncryptDecryptPool.__init__(self, *args, **kwargs) -        self._encr_queue = defer.DeferredQueue()          # TODO delete already synced files from database      def start(self): @@ -189,19 +159,11 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          """          SyncEncryptDecryptPool.start(self)          logger.debug("Starting the encryption loop...") -        reactor.callWhenRunning(self._maybe_encrypt_and_recurse)      def stop(self):          """          Stop the encrypter pool.          """ -        # close the sync queue -        if self._encr_queue: -            q = self._encr_queue -            for d in q.pending: -                d.cancel() -            del q -            self._encr_queue = None          SyncEncryptDecryptPool.stop(self) @@ -212,29 +174,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          :param doc: The document to be encrypted.          :type doc: SoledadDocument          """ -        try: -            self._encr_queue.put(doc) -        except Queue.Full: -            # do not asynchronously encrypt this file if the queue is full -            pass - -    @defer.inlineCallbacks -    def _maybe_encrypt_and_recurse(self): -        """ -        Process one document from the encryption queue. - -        Asynchronously encrypt a document that will then be stored in the sync -        db. Processed documents will be read by the SoledadSyncTarget during -        the sync_exchange. -        """ -        try: -            while self.running: -                doc = yield self._encr_queue.get() -                self._encrypt_doc(doc) -        except defer.QueueUnderflow: -            self._delayed_call = reactor.callLater( -                self.ENCRYPT_LOOP_PERIOD, -                self._maybe_encrypt_and_recurse) +        self._encrypt_doc(doc)      def _encrypt_doc(self, doc):          """ @@ -253,9 +193,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          secret = self._crypto.secret          args = doc.doc_id, doc.rev, docstr, key, secret          # encrypt asynchronously -        self._pool.apply_async( -            encrypt_doc_task, args, -            callback=self._encrypt_doc_cb) +        d = threads.deferToThread( +            encrypt_doc_task, *args) +        d.addCallback(self._encrypt_doc_cb)      def _encrypt_doc_cb(self, result):          """ @@ -354,6 +294,7 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,      :return: A tuple containing the doc id, revision and encrypted content.      :rtype: tuple(str, str, str)      """ +    content = json.loads(content) if type(content) == str else content      decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)      return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -414,7 +355,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          self._docs_to_process = None          self._processed_docs = 0          self._last_inserted_idx = 0 -        self._decrypting_docs = []          # a list that holds the asynchronous decryption results so they can be          # collected when they are ready @@ -511,11 +451,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):                   has finished.          :rtype: twisted.internet.defer.Deferred          """ -        docstr = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ -                % self.TABLE_NAME -        return self._runOperation( -            query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) +        return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx)      def insert_received_doc(              self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -585,14 +521,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          """          soledad_assert(self._crypto is not None, "need a crypto object") -        content = json.loads(content)          key = self._crypto.doc_passphrase(doc_id)          secret = self._crypto.secret          args = doc_id, rev, content, gen, trans_id, key, secret, idx          # decrypt asynchronously -        self._async_results.append( -            self._pool.apply_async( -                decrypt_doc_task, args)) +        d = threads.deferToThread(decrypt_doc_task, *args) +        d.addCallback(self._decrypt_doc_cb)      def _decrypt_doc_cb(self, result):          """ @@ -610,7 +544,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          doc_id, rev, content, gen, trans_id, idx = result          logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"                       % (doc_id, rev, gen, trans_id)) -        self._decrypting_docs.remove((doc_id, rev))          return self.insert_received_doc(              doc_id, rev, content, gen, trans_id, idx) @@ -660,23 +593,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          defer.returnValue(insertable)      @defer.inlineCallbacks -    def _async_decrypt_received_docs(self): -        """ -        Get all the encrypted documents from the sync database and dispatch a -        decrypt worker to decrypt each one of them. - -        :return: A deferred that will fire after all documents have been -                 decrypted and inserted back in the sync db. -        :rtype: twisted.internet.defer.Deferred -        """ -        docs = yield self._get_docs(encrypted=True) -        for doc_id, rev, content, gen, trans_id, _, idx in docs: -            if (doc_id, rev) not in self._decrypting_docs: -                self._decrypting_docs.append((doc_id, rev)) -                self._async_decrypt_doc( -                    doc_id, rev, content, gen, trans_id, idx) - -    @defer.inlineCallbacks      def _process_decrypted_docs(self):          """          Fetch as many decrypted documents as can be taken from the expected @@ -763,21 +679,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          return self._runOperation(query)      @defer.inlineCallbacks -    def _collect_async_decryption_results(self): -        """ -        Collect the results of the asynchronous doc decryptions and re-raise -        any exception raised by a multiprocessing async decryption call. - -        :raise Exception: Raised if an async call has raised an exception. -        """ -        async_results = self._async_results[:] -        for res in async_results: -            if res.ready(): -                # XXX: might raise an exception! -                yield self._decrypt_doc_cb(res.get()) -                self._async_results.remove(res) - -    @defer.inlineCallbacks      def _decrypt_and_recurse(self):          """          Decrypt the documents received from remote replica and insert them @@ -796,8 +697,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          pending = self._docs_to_process          if processed < pending: -            yield self._async_decrypt_received_docs() -            yield self._collect_async_decryption_results()              docs = yield self._process_decrypted_docs()              yield self._delete_processed_docs(docs)              # recurse diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..fda90909 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,6 +19,7 @@ import json  from u1db import errors  from u1db.remote import utils  from twisted.internet import defer +from twisted.internet import threads  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import emit_async @@ -75,7 +76,7 @@ class HTTPDocFetcher(object):              last_known_generation, last_known_trans_id,              sync_id, 0)          self._received_docs = 0 -        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) +        number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1)          if ngen:              new_generation = ngen @@ -137,6 +138,7 @@ class HTTPDocFetcher(object):              body=str(body),              content_type='application/x-soledad-sync-get') +    @defer.inlineCallbacks      def _insert_received_doc(self, response, idx, total):          """          Insert a received document into the local replica. @@ -150,7 +152,8 @@ class HTTPDocFetcher(object):          """          new_generation, new_transaction_id, number_of_changes, doc_id, \              rev, content, gen, trans_id = \ -            self._parse_received_doc_response(response) +            (yield threads.deferToThread(self._parse_received_doc_response, +                                         response))          if doc_id is not None:              # decrypt incoming document and insert into local database              # ------------------------------------------------------------- @@ -185,7 +188,7 @@ class HTTPDocFetcher(object):          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid}          _emit_receive_status(user_data, self._received_docs, total) -        return number_of_changes, new_generation, new_transaction_id +        defer.returnValue((number_of_changes, new_generation, new_transaction_id))      def _parse_received_doc_response(self, response):          """ | 
