From 4f9ce171277f4048558484c3b528c7c53c35c713 Mon Sep 17 00:00:00 2001 From: drebs Date: Mon, 3 Aug 2015 15:06:59 -0300 Subject: [refactor] standardize start/stop of enc/dec pools * change close method name to stop * add start/stop methods to both enc/dec clases * remove any delayed calls on pool shutdown --- client/src/leap/soledad/client/api.py | 3 +- client/src/leap/soledad/client/encdecpool.py | 131 ++++++++++++++------------ client/src/leap/soledad/client/http_target.py | 2 +- 3 files changed, 76 insertions(+), 60 deletions(-) (limited to 'client/src/leap') diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index a3dfb1d4..3e9f6bd4 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -304,7 +304,7 @@ class Soledad(object): self._sync_db.close() self._sync_db = None if self._defer_encryption: - self._sync_enc_pool.close() + self._sync_enc_pool.stop() # # ILocalStorage @@ -769,6 +769,7 @@ class Soledad(object): # initialize syncing queue encryption pool self._sync_enc_pool = encdecpool.SyncEncrypterPool( self._crypto, self._sync_db) + self._sync_enc_pool.start() @property def _sync_db_extra_init(self): diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 010d4f78..a0154929 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -65,9 +65,23 @@ class SyncEncryptDecryptPool(object): """ self._crypto = crypto self._sync_db = sync_db + self._pool = None + self._delayed_call = None + + def start(self): + self._create_pool() + + def stop(self): + self._destroy_pool() + # maybe cancel the next delayed call + if self._delayed_call \ + and not self._delayed_call.called: + self._delayed_call.cancel() + + def _create_pool(self): self._pool = multiprocessing.Pool(self.WORKERS) - def close(self): + def _destroy_pool(self): """ Cleanly close the pool of workers. """ @@ -154,15 +168,28 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + self._encr_queue = multiprocessing.Queue() + # TODO delete already synced files from database - self._stopped = False - self._sync_queue = multiprocessing.Queue() - - # start the encryption loop + def start(self): + """ + Start the encrypter pool. + """ + SyncEncryptDecryptPool.start(self) logger.debug("Starting the encryption loop...") reactor.callWhenRunning(self._maybe_encrypt_and_recurse) - # TODO delete already synced files from database + def stop(self): + """ + Stop the encrypter pool. + """ + # close the sync queue + self._encr_queue.close() + q = self._encr_queue + del q + self._encr_queue = None + + SyncEncryptDecryptPool.stop(self) def enqueue_doc_for_encryption(self, doc): """ @@ -172,28 +199,27 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self._sync_queue.put_nowait(doc) + self._encr_queue.put_nowait(doc) except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass def _maybe_encrypt_and_recurse(self): """ - Process the syncing queue and send the documents there to be encrypted - in the sync db. They will be read by the SoledadSyncTarget during the - sync_exchange. - """ - if not self._stopped: - try: - doc = self._sync_queue.get(False) - self._encrypt_doc(doc) - except Queue.Empty: - pass - reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) - else: - logger.debug("Finished encrypter thread.") + Process one document from the encryption queue. + + Asynchronously encrypt a document that will then be stored in the sync + db. Processed documents will be read by the SoledadSyncTarget during + the sync_exchange. + """ + try: + doc = self._encr_queue.get(False) + self._encrypt_doc(doc) + except Queue.Empty: + pass + self._delayed_call = reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) def _encrypt_doc(self, doc): """ @@ -285,16 +311,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): % self.TABLE_NAME self._runOperation(query, (doc_id, doc_rev)) - def close(self): - """ - Close the encrypter pool. - """ - self._stopped = True - self._sync_queue.close() - q = self._sync_queue - del q - self._sync_queue = None - def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, idx): @@ -377,6 +393,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") self.source_replica_uid = kwargs.pop("source_replica_uid") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) self._docs_to_process = None @@ -393,15 +410,25 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # XXX in mail and keymanager we have a pattern for that -- kali. self._empty() + def start(self, docs_to_process): + """ + Set the number of documents we expect to process. + + This should be called by the during the sync exchange process as soon + as we know how many documents are arriving from the server. + + :param docs_to_process: The number of documents to process. + :type docs_to_process: int + """ + SyncEncryptDecryptPool.start(self) + self._docs_to_process = docs_to_process + self._deferred = defer.Deferred() + reactor.callWhenRunning(self._launch_decrypt_and_process) + def _launch_decrypt_and_process(self): - d = self._decrypt_and_process_docs() + d = self._decrypt_and_recurse() d.addErrback(self._errback) - def _schedule_decrypt_and_process(self): - reactor.callLater( - self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_process) - def _errback(self, failure): log.err(failure) self._deferred.errback(failure) @@ -416,33 +443,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ return self._deferred - def start(self, docs_to_process): - """ - Set the number of documents we expect to process. - - This should be called by the during the sync exchange process as soon - as we know how many documents are arriving from the server. - - :param docs_to_process: The number of documents to process. - :type docs_to_process: int - """ - self._docs_to_process = docs_to_process - self._deferred = defer.Deferred() - self._schedule_decrypt_and_process() - def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): """ Insert a received message with encrypted content, to be decrypted later on. - :param doc_id: The Document ID. + :param doc_id: The document ID. :type doc_id: str - :param doc_rev: The Document Revision + :param doc_rev: The document Revision :param doc_rev: str - :param content: the Content of the document + :param content: The content of the document :type content: str - :param gen: the Document Generation + :param gen: The document Generation :type gen: int :param trans_id: Transaction ID :type trans_id: str @@ -722,7 +735,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._async_results.remove(res) @defer.inlineCallbacks - def _decrypt_and_process_docs(self): + def _decrypt_and_recurse(self): """ Decrypt the documents received from remote replica and insert them into the local one. @@ -745,7 +758,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse - self._schedule_decrypt_and_process() + self._delayed_call = reactor.callLater( + self.DECRYPT_LOOP_PERIOD, + self._launch_decrypt_and_process) else: self._finish() diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 3589952a..3dc45e20 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -435,7 +435,7 @@ class SoledadHTTPSyncTarget(SyncTarget): if defer_decryption: yield self._sync_decr_pool.deferred - self._sync_decr_pool.close() + self._sync_decr_pool.stop() defer.returnValue([new_generation, new_transaction_id]) -- cgit v1.2.3