summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/encdecpool.py117
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py9
2 files changed, 14 insertions, 112 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 34667a1e..576b8b2c 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -22,8 +22,6 @@ during synchronization.
"""
-import multiprocessing
-import Queue
import json
import logging
@@ -51,9 +49,6 @@ class SyncEncryptDecryptPool(object):
Base class for encrypter/decrypter pools.
"""
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
-
def __init__(self, crypto, sync_db):
"""
Initialize the pool of encryption-workers.
@@ -66,21 +61,18 @@ class SyncEncryptDecryptPool(object):
"""
self._crypto = crypto
self._sync_db = sync_db
- self._pool = None
self._delayed_call = None
self._started = False
def start(self):
if self.running:
return
- self._create_pool()
self._started = True
def stop(self):
if not self.running:
return
self._started = False
- self._destroy_pool()
# maybe cancel the next delayed call
if self._delayed_call \
and not self._delayed_call.called:
@@ -90,27 +82,6 @@ class SyncEncryptDecryptPool(object):
def running(self):
return self._started
- def _create_pool(self):
- self._pool = multiprocessing.Pool(self.WORKERS)
-
- def _destroy_pool(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
def _runOperation(self, query, *args):
"""
Run an operation on the sync db.
@@ -180,7 +151,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Initialize the sync encrypter pool.
"""
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._encr_queue = defer.DeferredQueue()
# TODO delete already synced files from database
def start(self):
@@ -189,19 +159,11 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
SyncEncryptDecryptPool.start(self)
logger.debug("Starting the encryption loop...")
- reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
def stop(self):
"""
Stop the encrypter pool.
"""
- # close the sync queue
- if self._encr_queue:
- q = self._encr_queue
- for d in q.pending:
- d.cancel()
- del q
- self._encr_queue = None
SyncEncryptDecryptPool.stop(self)
@@ -212,29 +174,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:param doc: The document to be encrypted.
:type doc: SoledadDocument
"""
- try:
- self._encr_queue.put(doc)
- except Queue.Full:
- # do not asynchronously encrypt this file if the queue is full
- pass
-
- @defer.inlineCallbacks
- def _maybe_encrypt_and_recurse(self):
- """
- Process one document from the encryption queue.
-
- Asynchronously encrypt a document that will then be stored in the sync
- db. Processed documents will be read by the SoledadSyncTarget during
- the sync_exchange.
- """
- try:
- while self.running:
- doc = yield self._encr_queue.get()
- self._encrypt_doc(doc)
- except defer.QueueUnderflow:
- self._delayed_call = reactor.callLater(
- self.ENCRYPT_LOOP_PERIOD,
- self._maybe_encrypt_and_recurse)
+ self._encrypt_doc(doc)
def _encrypt_doc(self, doc):
"""
@@ -253,9 +193,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
# encrypt asynchronously
- self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self._encrypt_doc_cb)
+ d = threads.deferToThread(
+ encrypt_doc_task, *args)
+ d.addCallback(self._encrypt_doc_cb)
def _encrypt_doc_cb(self, result):
"""
@@ -354,6 +294,7 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
:return: A tuple containing the doc id, revision and encrypted content.
:rtype: tuple(str, str, str)
"""
+ content = json.loads(content) if type(content) == str else content
decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
@@ -414,7 +355,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._docs_to_process = None
self._processed_docs = 0
self._last_inserted_idx = 0
- self._decrypting_docs = []
# a list that holds the asynchronous decryption results so they can be
# collected when they are ready
@@ -511,11 +451,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
has finished.
:rtype: twisted.internet.defer.Deferred
"""
- docstr = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
- % self.TABLE_NAME
- return self._runOperation(
- query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+ return self._async_decrypt_doc(doc_id, doc_rev, content, gen, trans_id, idx)
def insert_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
@@ -585,14 +521,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
soledad_assert(self._crypto is not None, "need a crypto object")
- content = json.loads(content)
key = self._crypto.doc_passphrase(doc_id)
secret = self._crypto.secret
args = doc_id, rev, content, gen, trans_id, key, secret, idx
# decrypt asynchronously
- self._async_results.append(
- self._pool.apply_async(
- decrypt_doc_task, args))
+ d = threads.deferToThread(decrypt_doc_task, *args)
+ d.addCallback(self._decrypt_doc_cb)
def _decrypt_doc_cb(self, result):
"""
@@ -610,7 +544,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc_id, rev, content, gen, trans_id, idx = result
logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
% (doc_id, rev, gen, trans_id))
- self._decrypting_docs.remove((doc_id, rev))
return self.insert_received_doc(
doc_id, rev, content, gen, trans_id, idx)
@@ -660,23 +593,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
defer.returnValue(insertable)
@defer.inlineCallbacks
- def _async_decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
-
- :return: A deferred that will fire after all documents have been
- decrypted and inserted back in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- docs = yield self._get_docs(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _, idx in docs:
- if (doc_id, rev) not in self._decrypting_docs:
- self._decrypting_docs.append((doc_id, rev))
- self._async_decrypt_doc(
- doc_id, rev, content, gen, trans_id, idx)
-
- @defer.inlineCallbacks
def _process_decrypted_docs(self):
"""
Fetch as many decrypted documents as can be taken from the expected
@@ -763,21 +679,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
return self._runOperation(query)
@defer.inlineCallbacks
- def _collect_async_decryption_results(self):
- """
- Collect the results of the asynchronous doc decryptions and re-raise
- any exception raised by a multiprocessing async decryption call.
-
- :raise Exception: Raised if an async call has raised an exception.
- """
- async_results = self._async_results[:]
- for res in async_results:
- if res.ready():
- # XXX: might raise an exception!
- yield self._decrypt_doc_cb(res.get())
- self._async_results.remove(res)
-
- @defer.inlineCallbacks
def _decrypt_and_recurse(self):
"""
Decrypt the documents received from remote replica and insert them
@@ -796,8 +697,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
pending = self._docs_to_process
if processed < pending:
- yield self._async_decrypt_received_docs()
- yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 9f7a4193..fda90909 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -19,6 +19,7 @@ import json
from u1db import errors
from u1db.remote import utils
from twisted.internet import defer
+from twisted.internet import threads
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
@@ -75,7 +76,7 @@ class HTTPDocFetcher(object):
last_known_generation, last_known_trans_id,
sync_id, 0)
self._received_docs = 0
- number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1)
+ number_of_changes, ngen, ntrans = yield self._insert_received_doc(doc, 1, 1)
if ngen:
new_generation = ngen
@@ -137,6 +138,7 @@ class HTTPDocFetcher(object):
body=str(body),
content_type='application/x-soledad-sync-get')
+ @defer.inlineCallbacks
def _insert_received_doc(self, response, idx, total):
"""
Insert a received document into the local replica.
@@ -150,7 +152,8 @@ class HTTPDocFetcher(object):
"""
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
+ (yield threads.deferToThread(self._parse_received_doc_response,
+ response))
if doc_id is not None:
# decrypt incoming document and insert into local database
# -------------------------------------------------------------
@@ -185,7 +188,7 @@ class HTTPDocFetcher(object):
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total)
- return number_of_changes, new_generation, new_transaction_id
+ defer.returnValue((number_of_changes, new_generation, new_transaction_id))
def _parse_received_doc_response(self, response):
"""