From 841e6712ff9ff1ce2b8a5fe92012d89c2aec7077 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Dec 2015 16:06:26 -0300 Subject: [bug] concurrency bug while querying and inserting This line was missing an yield and without it we end up inserting a document that is being retrieved and bad things happen. This is the core fix from yesterday debugging session. During sequential syncs the pool was inserting and querying at the same time and sometimes repeating or failing to delete documents. --- client/src/leap/soledad/client/encdecpool.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 9333578b..a01d3b71 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): + if self.running: + return self._create_pool() self._started = True def stop(self): - if not self._started: + if not self.running: return self._started = False self._destroy_pool() @@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx = self._last_inserted_idx for doc_id, rev, content, gen, trans_id, encrypted, idx in \ decrypted_docs: - # XXX for some reason, a document might not have been deleted from - # the database. This is a bug. In this point, already - # processed documents should have been removed from the sync - # database and we should not have to skip them here. We need - # to find out why this is happening, fix, and remove the - # skipping below. - if (idx < last_idx + 1): - continue if (idx != last_idx + 1): break insertable.append((doc_id, rev, content, gen, trans_id, idx)) @@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) + @defer.inlineCallbacks def _collect_async_decryption_results(self): """ Collect the results of the asynchronous doc decryptions and re-raise @@ -773,7 +768,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): async_results = self._async_results[:] for res in async_results: if res.ready(): - self._decrypt_doc_cb(res.get()) # might raise an exception! + yield self._decrypt_doc_cb(res.get()) # might raise an exception! self._async_results.remove(res) @defer.inlineCallbacks @@ -796,7 +791,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if processed < pending: yield self._async_decrypt_received_docs() - self._collect_async_decryption_results() + yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse -- cgit v1.2.3