summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py52
1 files changed, 41 insertions, 11 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index df32d74f..69b0556b 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -136,7 +136,6 @@ class SyncEncryptDecryptPool(object):
"""
return self._sync_db.runQuery(query, *args)
-
def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
"""
Encrypt the content of the given document.
@@ -411,14 +410,45 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = 0
self._decrypting_docs = []
+ # a list that holds the asynchronous decryption results so they can be
+ # collected when they are ready
self._async_results = []
- # XXX we want to empty the database before starting, but this is an
- # asynchronous call, so we have to somehow make sure that it is
- # executed before any other call to the database, without
- # blocking.
- # XXX in mail and keymanager we have a pattern for that -- kali.
- self._empty()
+ # initialize db and make sure any database operation happens after
+ # db initialization
+ self._deferred_init = self._init_db()
+ self._wait_init_db('_runOperation', '_runQuery')
+
+
+ def _wait_init_db(self, *methods):
+ """
+ Methods that need to wait for db initialization.
+
+ :param methods: methods that need to wait for initialization
+ :type methods: tuple(str)
+ """
+ self._waiting = []
+ self._stored = {}
+
+ def _restore(_):
+ for method in self._stored:
+ setattr(self, method, self._stored[method])
+ for d in self._waiting:
+ d.callback(None)
+
+ def _makeWrapper(method):
+ def wrapper(*args, **kw):
+ d = defer.Deferred()
+ d.addCallback(lambda _: self._stored[method](*args, **kw))
+ self._waiting.append(d)
+ return d
+ return wrapper
+
+ for method in methods:
+ self._stored[method] = getattr(self, method)
+ setattr(self, method, _makeWrapper(method))
+
+ self._deferred_init.addCallback(_restore)
def start(self, docs_to_process):
"""
@@ -433,9 +463,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
SyncEncryptDecryptPool.start(self)
self._docs_to_process = docs_to_process
self._deferred = defer.Deferred()
- reactor.callWhenRunning(self._launch_decrypt_and_process)
+ reactor.callWhenRunning(self._launch_decrypt_and_recurse)
- def _launch_decrypt_and_process(self):
+ def _launch_decrypt_and_recurse(self):
d = self._decrypt_and_recurse()
d.addErrback(self._errback)
@@ -720,7 +750,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = idx
self._processed_docs += 1
- def _empty(self):
+ def _init_db(self):
"""
Empty the received docs table of the sync database.
@@ -770,7 +800,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# recurse
self._delayed_call = reactor.callLater(
self.DECRYPT_LOOP_PERIOD,
- self._launch_decrypt_and_process)
+ self._launch_decrypt_and_recurse)
else:
self._finish()