diff options
author | Victor Shyba <victor.shyba@gmail.com> | 2015-12-01 16:06:26 -0300 |
---|---|---|
committer | Victor Shyba <victor.shyba@gmail.com> | 2015-12-01 16:06:26 -0300 |
commit | 841e6712ff9ff1ce2b8a5fe92012d89c2aec7077 (patch) | |
tree | 109ce33ef527daa2ed1fd2f2969147e9d175bcb1 | |
parent | 202bdc553cc576bfbce1ba8a4c34569b1751c04d (diff) |
[bug] concurrency bug while querying and inserting
This line was missing an yield and without it we end up inserting a
document that is being retrieved and bad things happen.
This is the core fix from yesterday debugging session. During sequential
syncs the pool was inserting and querying at the same time and sometimes
repeating or failing to delete documents.
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 17 |
1 files changed, 6 insertions, 11 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 9333578b..a01d3b71 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object): self._started = False def start(self): + if self.running: + return self._create_pool() self._started = True def stop(self): - if not self._started: + if not self.running: return self._started = False self._destroy_pool() @@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): last_idx = self._last_inserted_idx for doc_id, rev, content, gen, trans_id, encrypted, idx in \ decrypted_docs: - # XXX for some reason, a document might not have been deleted from - # the database. This is a bug. In this point, already - # processed documents should have been removed from the sync - # database and we should not have to skip them here. We need - # to find out why this is happening, fix, and remove the - # skipping below. - if (idx < last_idx + 1): - continue if (idx != last_idx + 1): break insertable.append((doc_id, rev, content, gen, trans_id, idx)) @@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) return self._runOperation(query) + @defer.inlineCallbacks def _collect_async_decryption_results(self): """ Collect the results of the asynchronous doc decryptions and re-raise @@ -773,7 +768,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): async_results = self._async_results[:] for res in async_results: if res.ready(): - self._decrypt_doc_cb(res.get()) # might raise an exception! + yield self._decrypt_doc_cb(res.get()) # might raise an exception! self._async_results.remove(res) @defer.inlineCallbacks @@ -796,7 +791,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if processed < pending: yield self._async_decrypt_received_docs() - self._collect_async_decryption_results() + yield self._collect_async_decryption_results() docs = yield self._process_decrypted_docs() yield self._delete_processed_docs(docs) # recurse |