summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2015-12-01 16:06:26 -0300
committerVictor Shyba <victor.shyba@gmail.com>2015-12-01 16:06:26 -0300
commit841e6712ff9ff1ce2b8a5fe92012d89c2aec7077 (patch)
tree109ce33ef527daa2ed1fd2f2969147e9d175bcb1
parent202bdc553cc576bfbce1ba8a4c34569b1751c04d (diff)
[bug] concurrency bug while querying and inserting
This line was missing an yield and without it we end up inserting a document that is being retrieved and bad things happen. This is the core fix from yesterday debugging session. During sequential syncs the pool was inserting and querying at the same time and sometimes repeating or failing to delete documents.
-rw-r--r--client/src/leap/soledad/client/encdecpool.py17
1 files changed, 6 insertions, 11 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 9333578b..a01d3b71 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -70,11 +70,13 @@ class SyncEncryptDecryptPool(object):
self._started = False
def start(self):
+ if self.running:
+ return
self._create_pool()
self._started = True
def stop(self):
- if not self._started:
+ if not self.running:
return
self._started = False
self._destroy_pool()
@@ -650,14 +652,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
last_idx = self._last_inserted_idx
for doc_id, rev, content, gen, trans_id, encrypted, idx in \
decrypted_docs:
- # XXX for some reason, a document might not have been deleted from
- # the database. This is a bug. In this point, already
- # processed documents should have been removed from the sync
- # database and we should not have to skip them here. We need
- # to find out why this is happening, fix, and remove the
- # skipping below.
- if (idx < last_idx + 1):
- continue
if (idx != last_idx + 1):
break
insertable.append((doc_id, rev, content, gen, trans_id, idx))
@@ -763,6 +757,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
return self._runOperation(query)
+ @defer.inlineCallbacks
def _collect_async_decryption_results(self):
"""
Collect the results of the asynchronous doc decryptions and re-raise
@@ -773,7 +768,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
async_results = self._async_results[:]
for res in async_results:
if res.ready():
- self._decrypt_doc_cb(res.get()) # might raise an exception!
+ yield self._decrypt_doc_cb(res.get()) # might raise an exception!
self._async_results.remove(res)
@defer.inlineCallbacks
@@ -796,7 +791,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if processed < pending:
yield self._async_decrypt_received_docs()
- self._collect_async_decryption_results()
+ yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse