summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py27
1 files changed, 14 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 6d3c11b9..34667a1e 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -28,6 +28,7 @@ import json
import logging
from twisted.internet import reactor
+from twisted.internet import threads
from twisted.internet import defer
from twisted.python import log
@@ -70,11 +71,13 @@ class SyncEncryptDecryptPool(object):
self._started = False
def start(self):
+ if self.running:
+ return
self._create_pool()
self._started = True
def stop(self):
- if not self._started:
+ if not self.running:
return
self._started = False
self._destroy_pool()
@@ -650,14 +653,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
last_idx = self._last_inserted_idx
for doc_id, rev, content, gen, trans_id, encrypted, idx in \
decrypted_docs:
- # XXX for some reason, a document might not have been deleted from
- # the database. This is a bug. In this point, already
- # processed documents should have been removed from the sync
- # database and we should not have to skip them here. We need
- # to find out why this is happening, fix, and remove the
- # skipping below.
- if (idx < last_idx + 1):
- continue
if (idx != last_idx + 1):
break
insertable.append((doc_id, rev, content, gen, trans_id, idx))
@@ -693,7 +688,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
insertable = yield self._get_insertable_docs()
for doc_fields in insertable:
- self._insert_decrypted_local_doc(*doc_fields)
+ method = self._insert_decrypted_local_doc
+ # FIXME: This is used only because SQLCipherU1DBSync is synchronous
+ # When adbapi is used there is no need for an external thread
+ # Without this the reactor can freeze and fail docs download
+ yield threads.deferToThread(method, *doc_fields)
defer.returnValue(insertable)
def _delete_processed_docs(self, inserted):
@@ -763,6 +762,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
return self._runOperation(query)
+ @defer.inlineCallbacks
def _collect_async_decryption_results(self):
"""
Collect the results of the asynchronous doc decryptions and re-raise
@@ -773,7 +773,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
async_results = self._async_results[:]
for res in async_results:
if res.ready():
- self._decrypt_doc_cb(res.get()) # might raise an exception!
+ # XXX: might raise an exception!
+ yield self._decrypt_doc_cb(res.get())
self._async_results.remove(res)
@defer.inlineCallbacks
@@ -796,7 +797,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if processed < pending:
yield self._async_decrypt_received_docs()
- self._collect_async_decryption_results()
+ yield self._collect_async_decryption_results()
docs = yield self._process_decrypted_docs()
yield self._delete_processed_docs(docs)
# recurse
@@ -807,6 +808,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._finish()
def _finish(self):
- self._deferred.callback(None)
self._processed_docs = 0
self._last_inserted_idx = 0
+ self._deferred.callback(None)