summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py673
1 files changed, 673 insertions, 0 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
new file mode 100644
index 00000000..0466ec5d
--- /dev/null
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -0,0 +1,673 @@
+# -*- coding: utf-8 -*-
+# encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A pool of encryption/decryption concurrent and parallel workers for using
+during synchronization.
+"""
+
+
+import multiprocessing
+import threading
+import time
+import json
+import logging
+
+from zope.proxy import sameProxiedObjects
+
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import soledad_assert
+
+from leap.soledad.client.crypto import encrypt_docstr
+from leap.soledad.client.crypto import decrypt_doc_dict
+
+
+logger = logging.getLogger(__name__)
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = multiprocessing.cpu_count()
+
+ def __init__(self, crypto, sync_db):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: A database connection handle
+ :type sync_db: pysqlcipher.dbapi2.Connection
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = multiprocessing.cpu_count()
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
+ """
+ Insert results of encryption routine into the local sync database.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ """
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ return self._sync_db.runOperation(query, (doc_id, doc_rev, content))
+
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
+ idx):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. Encrypted documents are stored in the sync db by the actual soledad
+ sync loop.
+ 2. The soledad sync loop tells us how many documents we should expect
+ to process.
+ 3. We start a decrypt-and-process loop:
+
+ a. Encrypted documents are fetched.
+ b. Encrypted documents are decrypted.
+ c. The longest possible list of decrypted documents are inserted
+ in the soledad db (this depends on which documents have already
+ arrived and which documents have already been decrypte, because
+ the order of insertion in the local soledad db matters).
+ d. Processed documents are deleted from the database.
+
+ 4. When we have processed as many documents as we should, the loop
+ finishes.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
+ "trans_id, encrypted, idx"
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param source_replica_uid: The source replica uid, used to find the
+ correct callback for inserting documents.
+ :type source_replica_uid: str
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._last_inserted_idx = 0
+ self._docs_to_process = None
+ self._processed_docs = 0
+
+ self._async_results = []
+ self._exception = None
+ self._finished = threading.Event()
+
+ # clear the database before starting the sync
+ self._empty_db = threading.Event()
+ d = self._empty()
+ d.addCallback(lambda _: self._empty_db.set())
+
+ # start the decryption loop
+ self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished decryptor thread."))
+
+ def set_docs_to_process(self, docs_to_process):
+ """
+ Set the number of documents we expect to process.
+
+ This should be called by the during the sync exchange process as soon
+ as we know how many documents are arriving from the server.
+
+ :param docs_to_process: The number of documents to process.
+ :type docs_to_process: int
+ """
+ self._docs_to_process = docs_to_process
+
+ def insert_encrypted_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docstr = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(
+ query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+
+ def insert_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(
+ query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+
+ def _delete_received_doc(self, doc_id):
+ """
+ Delete a received doc after it was inserted into the local db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM '%s' WHERE doc_id=?" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(query, (doc_id,))
+
+ def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
+ workers=True):
+ """
+ Symmetrically decrypt a document and store in the sync db.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+
+ :return: A deferred that will fire after the document hasa been
+ decrypted and inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ content = json.loads(content)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret, idx
+
+ if workers:
+ # when using multiprocessing, we need to wait for all parallel
+ # processing to finish before continuing with the
+ # decrypt-and-process loop. We do this by using an extra deferred
+ # that will be fired by the multiprocessing callback when it has
+ # finished processing.
+ d1 = defer.Deferred()
+
+ def _multiprocessing_callback(result):
+ d2 = self._decrypt_doc_cb(result)
+ d2.addCallback(lambda defres: d1.callback(defres))
+
+ # save the async result object so we can inspect it for failures
+ self._async_results.append(
+ self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=_multiprocessing_callback))
+
+ return d1
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ return self._decrypt_doc_cb(res)
+
+ def _decrypt_doc_cb(self, result):
+ """
+ Store the decryption result in the sync db from where it will later be
+ picked by _process_decrypted.
+
+ :param result: A tuple containing the document's id, revision,
+ content, generation, transaction id and sync index.
+ :type result: tuple(str, str, str, int, str, int)
+
+ :return: A deferred that will fire after the document has been
+ inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ doc_id, rev, content, gen, trans_id, idx = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
+ return self.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ """
+ Get documents from the received docs table in the sync db.
+
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool or None
+ :param order_by: The name of the field to order results.
+ :type order_by: str
+ :param order: Whether the order should be ASC or DESC.
+ :type order: str
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
+ "idx FROM %s" % self.TABLE_NAME
+ if encrypted is not None:
+ query += " WHERE encrypted = %d" % int(encrypted)
+ query += " ORDER BY %s %s" % (order_by, order)
+ return self._sync_db.runQuery(query)
+
+ @defer.inlineCallbacks
+ def _get_insertable_docs(self):
+ """
+ Return a list of non-encrypted documents ready to be inserted.
+
+ :return: A deferred that will fire with the list of insertable
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # here, we fetch the list of decrypted documents and compare with the
+ # index of the last succesfully processed document.
+ decrypted_docs = yield self._get_docs(encrypted=False)
+ insertable = []
+ last_idx = self._last_inserted_idx
+ for doc_id, rev, content, gen, trans_id, encrypted, idx in \
+ decrypted_docs:
+ # XXX for some reason, a document might not have been deleted from
+ # the database. This is a bug. In this point, already
+ # processed documents should have been removed from the sync
+ # database and we should not have to skip them here. We need
+ # to find out why this is happening, fix, and remove the
+ # skipping below.
+ if (idx < last_idx + 1):
+ continue
+ if (idx != last_idx + 1):
+ break
+ insertable.append((doc_id, rev, content, gen, trans_id, idx))
+ last_idx += 1
+ defer.returnValue(insertable)
+
+ def _decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+
+ :return: A deferred that will fire after all documents have been
+ decrypted and inserted back in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def _callback(received_docs):
+ deferreds = []
+ for doc_id, rev, content, gen, trans_id, _, idx in received_docs:
+ deferreds.append(
+ self._decrypt_doc(
+ doc_id, rev, content, gen, trans_id, idx))
+ return defer.gatherResults(deferreds)
+
+ d = self._get_docs(encrypted=True)
+ d.addCallback(_callback)
+ return d
+
+ def _process_decrypted(self):
+ """
+ Fetch as many decrypted documents as can be taken from the expected
+ order and insert them in the database.
+
+ :return: A deferred that will fire with the list of inserted
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def _callback(insertable):
+ for doc_fields in insertable:
+ self._insert_decrypted_local_doc(*doc_fields)
+ return insertable
+
+ d = self._get_insertable_docs()
+ d.addCallback(_callback)
+ return d
+
+ def _delete_processed_docs(self, inserted):
+ """
+ Delete from the sync db documents that have been processed.
+
+ :param inserted: List of documents inserted in the previous process
+ step.
+ :type inserted: list
+
+ :return: A list of deferreds that will fire when each operation in the
+ database has finished.
+ :rtype: twisted.internet.defer.DeferredList
+ """
+ deferreds = []
+ for doc_id, doc_rev, _, _, _, _ in inserted:
+ deferreds.append(
+ self._delete_received_doc(doc_id))
+ if not deferreds:
+ return defer.succeed(None)
+ return defer.gatherResults(deferreds)
+
+ def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id, idx):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: "
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ gen = int(gen)
+ insert_fun(doc, gen, trans_id)
+
+ # store info about processed docs
+ self._last_inserted_idx = idx
+ self._processed_docs += 1
+
+ def _empty(self):
+ """
+ Empty the received docs table of the sync database.
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
+ return self._sync_db.runOperation(query)
+
+ def _raise_if_async_fails(self):
+ """
+ Raise any exception raised by a multiprocessing async decryption
+ call.
+
+ :raise Exception: Raised if an async call has raised an exception.
+ """
+ for res in self._async_results:
+ if res.ready():
+ if not res.successful():
+ # re-raise the exception raised by the remote call
+ res.get()
+
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ This method runs in its own thread, so sleeping will not interfere
+ with the main thread.
+ """
+ try:
+ # wait for database to be emptied
+ self._empty_db.wait()
+ # wait until we know how many documents we need to process
+ while self._docs_to_process is None:
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+ # because all database operations are asynchronous, we use an event to
+ # make sure we don't start the next loop before the current one has
+ # finished.
+ event = threading.Event()
+ # loop until we have processes as many docs as the number of changes
+ while self._processed_docs < self._docs_to_process:
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+ event.clear()
+ d = self._decrypt_received_docs()
+ d.addCallback(lambda _: self._raise_if_async_fails())
+ d.addCallback(lambda _: self._process_decrypted())
+ d.addCallback(self._delete_processed_docs)
+ d.addCallback(lambda _: event.set())
+ event.wait()
+ # sleep a bit to give time for some decryption work
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+ except Exception as e:
+ self._exception = e
+ self._finished.set()
+
+ def wait(self):
+ """
+ Wait for the decrypt-and-process loop to finish.
+ """
+ self._finished.wait()
+ if self._exception:
+ raise self._exception