From edf54f4a2c59990c91544614d6014a900a8e3af3 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 19 Aug 2015 12:36:59 -0300 Subject: [bug] wait for db init on sync decrypter pool Previous to this modification, the initialization of the sync decrypter pool could happen concurrently with other database operations. That could cause the pool to hang because it could be waiting for something that was mistakenly deleted because of the wrong order of database operations. This commit implements a standard which we already use in leap.keymanager and leap.mail which makes some methods wait for the initialization operation before they are actually called. Closes: #7386 --- client/src/leap/soledad/client/encdecpool.py | 52 ++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 11 deletions(-) (limited to 'client/src/leap/soledad') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index df32d74f..69b0556b 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -136,7 +136,6 @@ class SyncEncryptDecryptPool(object): """ return self._sync_db.runQuery(query, *args) - def encrypt_doc_task(doc_id, doc_rev, content, key, secret): """ Encrypt the content of the given document. @@ -411,14 +410,45 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = 0 self._decrypting_docs = [] + # a list that holds the asynchronous decryption results so they can be + # collected when they are ready self._async_results = [] - # XXX we want to empty the database before starting, but this is an - # asynchronous call, so we have to somehow make sure that it is - # executed before any other call to the database, without - # blocking. - # XXX in mail and keymanager we have a pattern for that -- kali. - self._empty() + # initialize db and make sure any database operation happens after + # db initialization + self._deferred_init = self._init_db() + self._wait_init_db('_runOperation', '_runQuery') + + + def _wait_init_db(self, *methods): + """ + Methods that need to wait for db initialization. + + :param methods: methods that need to wait for initialization + :type methods: tuple(str) + """ + self._waiting = [] + self._stored = {} + + def _restore(_): + for method in self._stored: + setattr(self, method, self._stored[method]) + for d in self._waiting: + d.callback(None) + + def _makeWrapper(method): + def wrapper(*args, **kw): + d = defer.Deferred() + d.addCallback(lambda _: self._stored[method](*args, **kw)) + self._waiting.append(d) + return d + return wrapper + + for method in methods: + self._stored[method] = getattr(self, method) + setattr(self, method, _makeWrapper(method)) + + self._deferred_init.addCallback(_restore) def start(self, docs_to_process): """ @@ -433,9 +463,9 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): SyncEncryptDecryptPool.start(self) self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - reactor.callWhenRunning(self._launch_decrypt_and_process) + reactor.callWhenRunning(self._launch_decrypt_and_recurse) - def _launch_decrypt_and_process(self): + def _launch_decrypt_and_recurse(self): d = self._decrypt_and_recurse() d.addErrback(self._errback) @@ -720,7 +750,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = idx self._processed_docs += 1 - def _empty(self): + def _init_db(self): """ Empty the received docs table of the sync database. @@ -770,7 +800,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # recurse self._delayed_call = reactor.callLater( self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_process) + self._launch_decrypt_and_recurse) else: self._finish() -- cgit v1.2.3