From 20a76a36f8dce0ebd0fe63690633c0e77727f2c4 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 5 Jun 2015 11:21:01 -0400 Subject: [bug] allow reuse of decr pool --- client/src/leap/soledad/client/encdecpool.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index c2e2eda8..52b29936 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -376,9 +376,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self.source_replica_uid = kwargs.pop("source_replica_uid") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._last_inserted_idx = 0 self._docs_to_process = None self._processed_docs = 0 + self._last_inserted_idx = 0 self._async_results = [] self._failure = None @@ -388,6 +388,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # asynchronous call, so we have to somehow make sure that it is # executed before any other call to the database, without # blocking. + # XXX in mail and keymanager we have a pattern for that -- kali. self._empty() def _launch_decrypt_and_process(self): @@ -404,6 +405,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): return self._failure def _set_failure(self, failure): + failure.printTraceback() self._failure = failure self._finished = True @@ -421,6 +423,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ self._docs_to_process = docs_to_process + self._finished = False self._schedule_decrypt_and_process() def insert_encrypted_received_doc( @@ -575,6 +578,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query += " ORDER BY %s %s" % (order_by, order) return self._runQuery(query) + @defer.inlineCallbacks def _get_insertable_docs(self): """ @@ -731,7 +735,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ if not self.failed(): - if self._processed_docs < self._docs_to_process: + processed = self._processed_docs + pending = self._docs_to_process + + if not self.has_finished() and processed < pending: yield self._async_decrypt_received_docs() yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() @@ -739,7 +746,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # recurse self._schedule_decrypt_and_process() else: - self._finished = True + self._mark_finished() + + def _mark_finished(self): + self._finished = True + self._processed_docs = 0 + self._last_inserted_idx = 0 def has_finished(self): """ -- cgit v1.2.3