diff options
author | drebs <drebs@leap.se> | 2015-08-19 12:36:59 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2015-08-19 15:41:30 -0300 |
commit | edf54f4a2c59990c91544614d6014a900a8e3af3 (patch) | |
tree | d2fe983af9ac2ef3214dce72a8adb9f198d999ce /client/src/leap/soledad | |
parent | 469136cb6018b7408e06f11131d5108c7892020d (diff) |
[bug] wait for db init on sync decrypter pool
Previous to this modification, the initialization of the sync decrypter pool
could happen concurrently with other database operations. That could cause the
pool to hang because it could be waiting for something that was mistakenly
deleted because of the wrong order of database operations.
This commit implements a standard which we already use in leap.keymanager and
leap.mail which makes some methods wait for the initialization operation
before they are actually called.
Closes: #7386
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 52 |
1 files changed, 41 insertions, 11 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index df32d74f..69b0556b 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -136,7 +136,6 @@ class SyncEncryptDecryptPool(object): """ return self._sync_db.runQuery(query, *args) - def encrypt_doc_task(doc_id, doc_rev, content, key, secret): """ Encrypt the content of the given document. @@ -411,14 +410,45 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = 0 self._decrypting_docs = [] + # a list that holds the asynchronous decryption results so they can be + # collected when they are ready self._async_results = [] - # XXX we want to empty the database before starting, but this is an - # asynchronous call, so we have to somehow make sure that it is - # executed before any other call to the database, without - # blocking. - # XXX in mail and keymanager we have a pattern for that -- kali. - self._empty() + # initialize db and make sure any database operation happens after + # db initialization + self._deferred_init = self._init_db() + self._wait_init_db('_runOperation', '_runQuery') + + + def _wait_init_db(self, *methods): + """ + Methods that need to wait for db initialization. + + :param methods: methods that need to wait for initialization + :type methods: tuple(str) + """ + self._waiting = [] + self._stored = {} + + def _restore(_): + for method in self._stored: + setattr(self, method, self._stored[method]) + for d in self._waiting: + d.callback(None) + + def _makeWrapper(method): + def wrapper(*args, **kw): + d = defer.Deferred() + d.addCallback(lambda _: self._stored[method](*args, **kw)) + self._waiting.append(d) + return d + return wrapper + + for method in methods: + self._stored[method] = getattr(self, method) + setattr(self, method, _makeWrapper(method)) + + self._deferred_init.addCallback(_restore) def start(self, docs_to_process): """ @@ -433,9 +463,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.start(self) self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - reactor.callWhenRunning(self._launch_decrypt_and_process) + reactor.callWhenRunning(self._launch_decrypt_and_recurse) - def _launch_decrypt_and_process(self): + def _launch_decrypt_and_recurse(self): d = self._decrypt_and_recurse() d.addErrback(self._errback) @@ -720,7 +750,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = idx self._processed_docs += 1 - def _empty(self): + def _init_db(self): """ Empty the received docs table of the sync database. @@ -770,7 +800,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # recurse self._delayed_call = reactor.callLater( self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_process) + self._launch_decrypt_and_recurse) else: self._finish() |