diff options
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 117 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 9 |
2 files changed, 14 insertions, 112 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 34667a1e..576b8b2c 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,8 +22,6 @@ during synchronization. """ -import multiprocessing -import Queue import json import logging @@ -51,9 +49,6 @@ class SyncEncryptDecryptPool(object): Base class for encrypter/decrypter pools. """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -66,21 +61,18 @@ class SyncEncryptDecryptPool(object): """ self._crypto = crypto self._sync_db = sync_db - self._pool = None self._delayed_call = None self._started = False def start(self): if self.running: return - self._create_pool() self._started = True def stop(self): if not self.running: return self._started = False - self._destroy_pool() # maybe cancel the next delayed call if self._delayed_call \ and not self._delayed_call.called: @@ -90,27 +82,6 @@ class SyncEncryptDecryptPool(object): def running(self): return self._started - def _create_pool(self): - self._pool = multiprocessing.Pool(self.WORKERS) - - def _destroy_pool(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - def _runOperation(self, query, *args): """ Run an operation on the sync db. @@ -180,7 +151,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -189,19 +159,11 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ SyncEncryptDecryptPool.start(self) logger.debug("Starting the encryption loop...") - reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def stop(self): """ Stop the encrypter pool. """ - # close the sync queue - if self._encr_queue: - q = self._encr_queue - for d in q.pending: - d.cancel() - del q - self._encr_queue = None SyncEncryptDecryptPool.stop(self) @@ -212,29 +174,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :param doc: The document to be encrypted. :type doc: SoledadDocument """ - try: - self._encr_queue.put(doc) - except Queue.Full: - # do not asynchronously encrypt this file if the queue is full - pass - - @defer.inlineCallbacks - def _maybe_encrypt_and_recurse(self): - """ - Process one document from the encryption queue. - - Asynchronously encrypt a document that will then be stored in the sync - db. Processed documents will be read by the SoledadSyncTarget during - the sync_exchange. - """ - try: - while self.running: - doc = yield self._encr_queue.get() - self._encrypt_doc(doc) - except defer.QueueUnderflow: - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) + self._encrypt_doc(doc) def _encrypt_doc(self, doc): """ @@ -253,9 +193,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret # encrypt asynchronously - self._pool.apply_async( - encrypt_doc_task, args, - callback=self._encrypt_doc_cb) + d = threads.deferToThread( + encrypt_doc_task, *args) + d.addCallback(self._encrypt_doc_cb) def _encrypt_doc_cb(self, result): """ @@ -354,6 +294,7 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :return: A tuple containing the doc id, revision and encrypted content. :rtype: tuple(str, str, str) """ + content = json.loads(content) if type(content) == str else content decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx @@ -414,7 +355,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._docs_to_process = None self._processed_docs = 0 self._last_inserted_idx = 0 - self._decrypting_docs = [] # a list that holds the asynchronous decryption results so they can be # collected when they are ready @@ -511,11 +451,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): has finished. :rtype: twisted.internet.defer.Deferred """ - docstr = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - return self._runOperation( - query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx) def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -585,14 +521,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ soledad_assert(self._crypto is not None, "need a crypto object") - content = json.loads(content) key = self._crypto.doc_passphrase(doc_id) secret = self._crypto.secret args = doc_id, rev, content, gen, trans_id, key, secret, idx # decrypt asynchronously - self._async_results.append( - self._pool.apply_async( - decrypt_doc_task, args)) + d = threads.deferToThread(decrypt_doc_task, *args) + d.addCallback(self._decrypt_doc_cb) def _decrypt_doc_cb(self, result): """ @@ -610,7 +544,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id, idx = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self._decrypting_docs.remove((doc_id, rev)) return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) @@ -660,23 +593,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): defer.returnValue(insertable) @defer.inlineCallbacks - def _async_decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - - :return: A deferred that will fire after all documents have been - decrypted and inserted back in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - docs = yield self._get_docs(encrypted=True) - for doc_id, rev, content, gen, trans_id, _, idx in docs: - if (doc_id, rev) not in self._decrypting_docs: - self._decrypting_docs.append((doc_id, rev)) - self._async_decrypt_doc( - doc_id, rev, content, gen, trans_id, idx) - - @defer.inlineCallbacks def _process_decrypted_docs(self): """ Fetch as many decrypted documents as can be taken from the expected @@ -763,21 +679,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self._runOperation(query) @defer.inlineCallbacks - def _collect_async_decryption_results(self): - """ - Collect the results of the asynchronous doc decryptions and re-raise - any exception raised by a multiprocessing async decryption call. - - :raise Exception: Raised if an async call has raised an exception. - """ - async_results = self._async_results[:] - for res in async_results: - if res.ready(): - # XXX: might raise an exception! - yield self._decrypt_doc_cb(res.get()) - self._async_results.remove(res) - - @defer.inlineCallbacks def _decrypt_and_recurse(self): """ Decrypt the documents received from remote replica and insert them @@ -796,8 +697,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): pending = self._docs_to_process if processed < pending: - yield self._async_decrypt_received_docs() - yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..fda90909 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -19,6 +19,7 @@ import json from u1db import errors from u1db.remote import utils from twisted.internet import defer +from twisted.internet import threads from leap.soledad.common.document import SoledadDocument from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async @@ -75,7 +76,7 @@ class HTTPDocFetcher(object): last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1) if ngen: new_generation = ngen @@ -137,6 +138,7 @@ class HTTPDocFetcher(object): body=str(body), content_type='application/x-soledad-sync-get') + @defer.inlineCallbacks def _insert_received_doc(self, response, idx, total): """ Insert a received document into the local replica. @@ -150,7 +152,8 @@ class HTTPDocFetcher(object): """ new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) + (yield threads.deferToThread(self._parse_received_doc_response, + response)) if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- @@ -185,7 +188,7 @@ class HTTPDocFetcher(object): self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id + defer.returnValue((number_of_changes, new_generation, new_transaction_id)) def _parse_received_doc_response(self, response): """ |