summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py746
1 files changed, 746 insertions, 0 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
new file mode 100644
index 00000000..d9a72b25
--- /dev/null
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -0,0 +1,746 @@
+# -*- coding: utf-8 -*-
+# encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A pool of encryption/decryption concurrent and parallel workers for using
+during synchronization.
+"""
+
+
+import multiprocessing
+import Queue
+import json
+import logging
+
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import soledad_assert
+
+from leap.soledad.client.crypto import encrypt_docstr
+from leap.soledad.client.crypto import decrypt_doc_dict
+
+
+logger = logging.getLogger(__name__)
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = multiprocessing.cpu_count()
+
+ def __init__(self, crypto, sync_db):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: A database connection handle
+ :type sync_db: pysqlcipher.dbapi2.Connection
+ """
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._pool = multiprocessing.Pool(self.WORKERS)
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+ def _runOperation(self, query, *args):
+ """
+ Run an operation on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runOperation(query, *args)
+
+ def _runQuery(self, query, *args):
+ """
+ Run a query on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runQuery(query, *args)
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
+
+ ENCRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the sync encrypter pool.
+ """
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._stopped = False
+ self._sync_queue = multiprocessing.Queue()
+
+ # start the encryption loop
+ self._deferred_loop = deferToThread(self._encrypt_docs_loop)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished encrypter thread."))
+
+ def enqueue_doc_for_encryption(self, doc):
+ """
+ Enqueue a document for encryption.
+
+ :param doc: The document to be encrypted.
+ :type doc: SoledadDocument
+ """
+ try:
+ self.sync_queue.put_nowait(doc)
+ except multiprocessing.Queue.Full:
+ # do not asynchronously encrypt this file if the queue is full
+ pass
+
+ def _encrypt_docs_loop(self):
+ """
+ Process the syncing queue and send the documents there to be encrypted
+ in the sync db. They will be read by the SoledadSyncTarget during the
+ sync_exchange.
+ """
+ logger.debug("Starting encrypter thread.")
+ while not self._stopped:
+ try:
+ doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ self._encrypt_doc(doc)
+ except Queue.Empty:
+ pass
+
+ def _encrypt_doc(self, doc):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
+
+ def _encrypt_doc_cb(self, result):
+ """
+ Insert results of encryption routine into the local sync database.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
+
+ def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ """
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ return self._runOperation(query, (doc_id, doc_rev, content))
+
+ @defer.inlineCallbacks
+ def get_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Get an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire with the encrypted content of the
+ document or None if the document was not found in the sync
+ db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id)
+ query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ result = yield self._runQuery(query, (doc_id, doc_rev))
+ if result:
+ val = result.pop()
+ defer.returnValue(val[0])
+ defer.returnValue(None)
+
+ def delete_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Delete an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ self._runOperation(query, (doc_id, doc_rev))
+
+ def close(self):
+ """
+ Close the encrypter pool.
+ """
+ self._stopped = True
+ self._sync_queue.close()
+ q = self._sync_queue
+ del q
+ self._sync_queue = None
+
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
+ idx):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. Encrypted documents are stored in the sync db by the actual soledad
+ sync loop.
+ 2. The soledad sync loop tells us how many documents we should expect
+ to process.
+ 3. We start a decrypt-and-process loop:
+
+ a. Encrypted documents are fetched.
+ b. Encrypted documents are decrypted.
+ c. The longest possible list of decrypted documents are inserted
+ in the soledad db (this depends on which documents have already
+ arrived and which documents have already been decrypte, because
+ the order of insertion in the local soledad db matters).
+ d. Processed documents are deleted from the database.
+
+ 4. When we have processed as many documents as we should, the loop
+ finishes.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
+ "trans_id, encrypted, idx"
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param source_replica_uid: The source replica uid, used to find the
+ correct callback for inserting documents.
+ :type source_replica_uid: str
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._last_inserted_idx = 0
+ self._docs_to_process = None
+ self._processed_docs = 0
+
+ self._async_results = []
+ self._failure = None
+ self._finished = False
+
+ # XXX we want to empty the database before starting, but this is an
+ # asynchronous call, so we have to somehow make sure that it is
+ # executed before any other call to the database, without
+ # blocking.
+ self._empty()
+
+ def _launch_decrypt_and_process(self):
+ d = self._decrypt_and_process_docs()
+ d.addErrback(lambda f: self._set_failure(f))
+
+ def _schedule_decrypt_and_process(self):
+ reactor.callLater(
+ self.DECRYPT_LOOP_PERIOD,
+ self._launch_decrypt_and_process)
+
+ @property
+ def failure(self):
+ return self._failure
+
+ def _set_failure(self, failure):
+ self._failure = failure
+ self._finished = True
+
+ def failed(self):
+ return bool(self._failure)
+
+ def start(self, docs_to_process):
+ """
+ Set the number of documents we expect to process.
+
+ This should be called by the during the sync exchange process as soon
+ as we know how many documents are arriving from the server.
+
+ :param docs_to_process: The number of documents to process.
+ :type docs_to_process: int
+ """
+ self._docs_to_process = docs_to_process
+ self._schedule_decrypt_and_process()
+
+ def insert_encrypted_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docstr = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._runOperation(
+ query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+
+ def insert_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._runOperation(
+ query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+
+ def _delete_received_doc(self, doc_id):
+ """
+ Delete a received doc after it was inserted into the local db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM '%s' WHERE doc_id=?" \
+ % self.TABLE_NAME
+ return self._runOperation(query, (doc_id,))
+
+ def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):
+ """
+ Dispatch an asynchronous document decrypting routine and save the
+ result object.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire after the document hasa been
+ decrypted and inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ content = json.loads(content)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret, idx
+ # decrypt asynchronously
+ self._async_results.append(
+ self._pool.apply_async(
+ decrypt_doc_task, args))
+
+ def _decrypt_doc_cb(self, result):
+ """
+ Store the decryption result in the sync db from where it will later be
+ picked by _process_decrypted_docs.
+
+ :param result: A tuple containing the document's id, revision,
+ content, generation, transaction id and sync index.
+ :type result: tuple(str, str, str, int, str, int)
+
+ :return: A deferred that will fire after the document has been
+ inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ doc_id, rev, content, gen, trans_id, idx = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
+ return self.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ """
+ Get documents from the received docs table in the sync db.
+
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool or None
+ :param order_by: The name of the field to order results.
+ :type order_by: str
+ :param order: Whether the order should be ASC or DESC.
+ :type order: str
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
+ "idx FROM %s" % self.TABLE_NAME
+ if encrypted is not None:
+ query += " WHERE encrypted = %d" % int(encrypted)
+ query += " ORDER BY %s %s" % (order_by, order)
+ return self._runQuery(query)
+
+ @defer.inlineCallbacks
+ def _get_insertable_docs(self):
+ """
+ Return a list of non-encrypted documents ready to be inserted.
+
+ :return: A deferred that will fire with the list of insertable
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # here, we fetch the list of decrypted documents and compare with the
+ # index of the last succesfully processed document.
+ decrypted_docs = yield self._get_docs(encrypted=False)
+ insertable = []
+ last_idx = self._last_inserted_idx
+ for doc_id, rev, content, gen, trans_id, encrypted, idx in \
+ decrypted_docs:
+ # XXX for some reason, a document might not have been deleted from
+ # the database. This is a bug. In this point, already
+ # processed documents should have been removed from the sync
+ # database and we should not have to skip them here. We need
+ # to find out why this is happening, fix, and remove the
+ # skipping below.
+ if (idx < last_idx + 1):
+ continue
+ if (idx != last_idx + 1):
+ break
+ insertable.append((doc_id, rev, content, gen, trans_id, idx))
+ last_idx += 1
+ defer.returnValue(insertable)
+
+ @defer.inlineCallbacks
+ def _async_decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+
+ :return: A deferred that will fire after all documents have been
+ decrypted and inserted back in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docs = yield self._get_docs(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _, idx in docs:
+ self._async_decrypt_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ @defer.inlineCallbacks
+ def _process_decrypted_docs(self):
+ """
+ Fetch as many decrypted documents as can be taken from the expected
+ order and insert them in the local replica.
+
+ :return: A deferred that will fire with the list of inserted
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ insertable = yield self._get_insertable_docs()
+ for doc_fields in insertable:
+ self._insert_decrypted_local_doc(*doc_fields)
+ defer.returnValue(insertable)
+
+ def _delete_processed_docs(self, inserted):
+ """
+ Delete from the sync db documents that have been processed.
+
+ :param inserted: List of documents inserted in the previous process
+ step.
+ :type inserted: list
+
+ :return: A list of deferreds that will fire when each operation in the
+ database has finished.
+ :rtype: twisted.internet.defer.DeferredList
+ """
+ deferreds = []
+ for doc_id, doc_rev, _, _, _, _ in inserted:
+ deferreds.append(
+ self._delete_received_doc(doc_id))
+ if not deferreds:
+ return defer.succeed(None)
+ return defer.gatherResults(deferreds)
+
+ def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id, idx):
+ """
+ Insert the decrypted document into the local replica.
+
+ Make use of the passed callback `insert_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ logger.debug("Sync decrypter pool: inserting doc in local db: "
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ gen = int(gen)
+ self._insert_doc_cb(doc, gen, trans_id)
+
+ # store info about processed docs
+ self._last_inserted_idx = idx
+ self._processed_docs += 1
+
+ def _empty(self):
+ """
+ Empty the received docs table of the sync database.
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
+ return self._runOperation(query)
+
+ def _collect_async_decryption_results(self):
+ """
+ Collect the results of the asynchronous doc decryptions and re-raise
+ any exception raised by a multiprocessing async decryption call.
+
+ :raise Exception: Raised if an async call has raised an exception.
+ """
+ async_results = self._async_results[:]
+ for res in async_results:
+ if res.ready():
+ self._decrypt_doc_cb(res.get()) # might raise an exception!
+ self._async_results.remove(res)
+
+ @defer.inlineCallbacks
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ This method implicitelly returns a defferred (see the decorator
+ above). It should only be called by _launch_decrypt_and_process().
+ because this way any exceptions raised here will be stored by the
+ errback attached to the deferred returned.
+
+ :return: A deferred which will fire after all decrypt, process and
+ delete operations have been executed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not self.failed():
+ if self._processed_docs < self._docs_to_process:
+ yield self._async_decrypt_received_docs()
+ yield self._collect_async_decryption_results()
+ docs = yield self._process_decrypted_docs()
+ yield self._delete_processed_docs(docs)
+ # recurse
+ self._schedule_decrypt_and_process()
+ else:
+ self._finished = True
+
+ def has_finished(self):
+ """
+ Return whether the decrypter has finished its work.
+ """
+ return self._finished