summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/api.py3
-rw-r--r--client/src/leap/soledad/client/encdecpool.py131
-rw-r--r--client/src/leap/soledad/client/http_target.py2
3 files changed, 76 insertions, 60 deletions
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index a3dfb1d4..3e9f6bd4 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -304,7 +304,7 @@ class Soledad(object):
self._sync_db.close()
self._sync_db = None
if self._defer_encryption:
- self._sync_enc_pool.close()
+ self._sync_enc_pool.stop()
#
# ILocalStorage
@@ -769,6 +769,7 @@ class Soledad(object):
# initialize syncing queue encryption pool
self._sync_enc_pool = encdecpool.SyncEncrypterPool(
self._crypto, self._sync_db)
+ self._sync_enc_pool.start()
@property
def _sync_db_extra_init(self):
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 010d4f78..a0154929 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -65,9 +65,23 @@ class SyncEncryptDecryptPool(object):
"""
self._crypto = crypto
self._sync_db = sync_db
+ self._pool = None
+ self._delayed_call = None
+
+ def start(self):
+ self._create_pool()
+
+ def stop(self):
+ self._destroy_pool()
+ # maybe cancel the next delayed call
+ if self._delayed_call \
+ and not self._delayed_call.called:
+ self._delayed_call.cancel()
+
+ def _create_pool(self):
self._pool = multiprocessing.Pool(self.WORKERS)
- def close(self):
+ def _destroy_pool(self):
"""
Cleanly close the pool of workers.
"""
@@ -154,15 +168,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Initialize the sync encrypter pool.
"""
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+ self._encr_queue = multiprocessing.Queue()
+ # TODO delete already synced files from database
- self._stopped = False
- self._sync_queue = multiprocessing.Queue()
-
- # start the encryption loop
+ def start(self):
+ """
+ Start the encrypter pool.
+ """
+ SyncEncryptDecryptPool.start(self)
logger.debug("Starting the encryption loop...")
reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
- # TODO delete already synced files from database
+ def stop(self):
+ """
+ Stop the encrypter pool.
+ """
+ # close the sync queue
+ self._encr_queue.close()
+ q = self._encr_queue
+ del q
+ self._encr_queue = None
+
+ SyncEncryptDecryptPool.stop(self)
def enqueue_doc_for_encryption(self, doc):
"""
@@ -172,28 +199,27 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type doc: SoledadDocument
"""
try:
- self._sync_queue.put_nowait(doc)
+ self._encr_queue.put_nowait(doc)
except Queue.Full:
# do not asynchronously encrypt this file if the queue is full
pass
def _maybe_encrypt_and_recurse(self):
"""
- Process the syncing queue and send the documents there to be encrypted
- in the sync db. They will be read by the SoledadSyncTarget during the
- sync_exchange.
- """
- if not self._stopped:
- try:
- doc = self._sync_queue.get(False)
- self._encrypt_doc(doc)
- except Queue.Empty:
- pass
- reactor.callLater(
- self.ENCRYPT_LOOP_PERIOD,
- self._maybe_encrypt_and_recurse)
- else:
- logger.debug("Finished encrypter thread.")
+ Process one document from the encryption queue.
+
+ Asynchronously encrypt a document that will then be stored in the sync
+ db. Processed documents will be read by the SoledadSyncTarget during
+ the sync_exchange.
+ """
+ try:
+ doc = self._encr_queue.get(False)
+ self._encrypt_doc(doc)
+ except Queue.Empty:
+ pass
+ self._delayed_call = reactor.callLater(
+ self.ENCRYPT_LOOP_PERIOD,
+ self._maybe_encrypt_and_recurse)
def _encrypt_doc(self, doc):
"""
@@ -285,16 +311,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
% self.TABLE_NAME
self._runOperation(query, (doc_id, doc_rev))
- def close(self):
- """
- Close the encrypter pool.
- """
- self._stopped = True
- self._sync_queue.close()
- q = self._sync_queue
- del q
- self._sync_queue = None
-
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
idx):
@@ -377,6 +393,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
self.source_replica_uid = kwargs.pop("source_replica_uid")
+
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
self._docs_to_process = None
@@ -393,15 +410,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# XXX in mail and keymanager we have a pattern for that -- kali.
self._empty()
+ def start(self, docs_to_process):
+ """
+ Set the number of documents we expect to process.
+
+ This should be called by the during the sync exchange process as soon
+ as we know how many documents are arriving from the server.
+
+ :param docs_to_process: The number of documents to process.
+ :type docs_to_process: int
+ """
+ SyncEncryptDecryptPool.start(self)
+ self._docs_to_process = docs_to_process
+ self._deferred = defer.Deferred()
+ reactor.callWhenRunning(self._launch_decrypt_and_process)
+
def _launch_decrypt_and_process(self):
- d = self._decrypt_and_process_docs()
+ d = self._decrypt_and_recurse()
d.addErrback(self._errback)
- def _schedule_decrypt_and_process(self):
- reactor.callLater(
- self.DECRYPT_LOOP_PERIOD,
- self._launch_decrypt_and_process)
-
def _errback(self, failure):
log.err(failure)
self._deferred.errback(failure)
@@ -416,33 +443,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
return self._deferred
- def start(self, docs_to_process):
- """
- Set the number of documents we expect to process.
-
- This should be called by the during the sync exchange process as soon
- as we know how many documents are arriving from the server.
-
- :param docs_to_process: The number of documents to process.
- :type docs_to_process: int
- """
- self._docs_to_process = docs_to_process
- self._deferred = defer.Deferred()
- self._schedule_decrypt_and_process()
-
def insert_encrypted_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
"""
Insert a received message with encrypted content, to be decrypted later
on.
- :param doc_id: The Document ID.
+ :param doc_id: The document ID.
:type doc_id: str
- :param doc_rev: The Document Revision
+ :param doc_rev: The document Revision
:param doc_rev: str
- :param content: the Content of the document
+ :param content: The content of the document
:type content: str
- :param gen: the Document Generation
+ :param gen: The document Generation
:type gen: int
:param trans_id: Transaction ID
:type trans_id: str
@@ -722,7 +735,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._async_results.remove(res)
@defer.inlineCallbacks
- def _decrypt_and_process_docs(self):
+ def _decrypt_and_recurse(self):
"""
Decrypt the documents received from remote replica and insert them
into the local one.
@@ -745,7 +758,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse
- self._schedule_decrypt_and_process()
+ self._delayed_call = reactor.callLater(
+ self.DECRYPT_LOOP_PERIOD,
+ self._launch_decrypt_and_process)
else:
self._finish()
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
index 3589952a..3dc45e20 100644
--- a/client/src/leap/soledad/client/http_target.py
+++ b/client/src/leap/soledad/client/http_target.py
@@ -435,7 +435,7 @@ class SoledadHTTPSyncTarget(SyncTarget):
if defer_decryption:
yield self._sync_decr_pool.deferred
- self._sync_decr_pool.close()
+ self._sync_decr_pool.stop()
defer.returnValue([new_generation, new_transaction_id])