summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py552
1 files changed, 0 insertions, 552 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index dd40b198..bdbaa8e0 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -23,21 +23,13 @@ import hmac
import hashlib
import json
import logging
-import multiprocessing
-import threading
-import time
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
-from zope.proxy import sameProxiedObjects
-
-from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
-from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
@@ -515,547 +507,3 @@ def is_symmetrically_encrypted(doc):
== crypto.EncryptionSchemes.SYMKEY:
return True
return False
-
-
-#
-# Encrypt/decrypt pools of workers
-#
-
-class SyncEncryptDecryptPool(object):
- """
- Base class for encrypter/decrypter pools.
- """
- WORKERS = multiprocessing.cpu_count()
-
- def __init__(self, crypto, sync_db):
- """
- Initialize the pool of encryption-workers.
-
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
-
- :param sync_db: A database connection handle
- :type sync_db: pysqlcipher.dbapi2.Connection
-
- :param write_lock: a write lock for controlling concurrent access
- to the sync_db
- :type write_lock: threading.Lock
- """
- self._pool = multiprocessing.Pool(self.WORKERS)
- self._crypto = crypto
- self._sync_db = sync_db
-
- def close(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
-
-def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
- """
- Encrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- encrypted_content = encrypt_docstr(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, encrypted_content
-
-
-class SyncEncrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric encryption
- of documents to be synced.
- """
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
- TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
-
- def encrypt_doc(self, doc, workers=True):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
- docstr = doc.get_json()
- key = self._crypto.doc_passphrase(doc.doc_id)
- secret = self._crypto.secret
- args = doc.doc_id, doc.rev, docstr, key, secret
-
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
- res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
- """
- Insert results of encryption routine into the local sync database.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, doc_rev, content = result
- return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
-
- @defer.inlineCallbacks
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
- """
- Insert the contents of the encrypted doc into the local sync
- database.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param content: The encrypted document.
- :type content: str
- """
- # FIXME --- callback should complete immediately since otherwise the
- # thread which handles the results will get blocked
- # Right now we're blocking the dispatcher with the writes to sqlite.
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
- % (self.TABLE_NAME,)
- yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))
-
-
-def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
- """
- Decrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification of
- that document.
- :type trans_id: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- decrypted_content = decrypt_doc_dict(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, decrypted_content, gen, trans_id
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. All the encrypted docs are collected, together with their generation
- and transaction-id
- 2. The docs are enqueued for decryption. When completed, they are
- inserted following the generation order.
- """
- # TODO implement throttling to reduce cpu usage??
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted"
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- self.source_replica_uid = kwargs.pop("source_replica_uid")
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._async_results = []
-
- self._stopped = threading.Event()
- self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decryptor thread."))
-
- @defer.inlineCallbacks
- def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert a received message with encrypted content, to be decrypted later
- on.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- docstr = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(
- query,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
- @defer.inlineCallbacks
- def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- if not isinstance(content, str):
- content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(
- query,
- (doc_id, doc_rev, content, gen, trans_id, 0))
-
- @defer.inlineCallbacks
- def delete_received_doc(self, doc_id, doc_rev):
- """
- Delete a received doc after it was inserted into the local db.
-
- :param doc_id: Document ID.
- :type doc_id: str
- :param doc_rev: Document revision.
- :type doc_rev: str
- """
- sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev))
-
- def _decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
- """
- Symmetrically decrypt a document.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- :param source_replica_uid:
- :type source_replica_uid: str
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- self.source_replica_uid = source_replica_uid
-
- # insert_doc_cb is a proxy object that gets updated with the right
- # insert function only when the sync_target invokes the sync_exchange
- # method. so, if we don't still have a non-empty callback, we refuse
- # to proceed.
- if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
- None):
- logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
- return
-
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- if len(content) == 0:
- # not encrypted payload
- return
-
- content = json.loads(content)
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret
-
- if workers:
- # save the async result object so we can inspect it for failures
- self._async_results.append(self._pool.apply_async(
- decrypt_doc_task, args,
- callback=self._decrypt_doc_cb))
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- self._decrypt_doc_cb(res)
-
- def _decrypt_doc_cb(self, result):
- """
- Store the decryption result in the sync db from where it will later be
- picked by _process_decrypted.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
- % (doc_id, rev, gen, trans_id))
- return self.insert_received_doc(doc_id, rev, content, gen, trans_id)
-
- def get_docs_by_generation(self, encrypted=None):
- """
- Get all documents in the received table from the sync db,
- ordered by generation.
-
- :param encrypted: If not None, only return documents with encrypted
- field equal to given parameter.
- :type encrypted: bool or None
-
- :return: list of doc_id, rev, generation, gen, trans_id
- :rtype: list
- """
- sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \
- % self.TABLE_NAME
- if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- sql += " ORDER BY gen ASC"
- return self._fetchall(sql)
-
- @defer.inlineCallbacks
- def get_insertable_docs_by_gen(self):
- """
- Return a list of non-encrypted documents ready to be inserted.
- """
- # here, we compare the list of all available docs with the list of
- # decrypted docs and find the longest common prefix between these two
- # lists. Note that the order of lists fetch matters: if instead we
- # first fetch the list of decrypted docs and then the list of all
- # docs, then some document might have been decrypted between these two
- # calls, and if it is just the right doc then it might not be caught
- # by the next loop.
- all_docs = yield self.get_docs_by_generation()
- decrypted_docs = yield self.get_docs_by_generation(encrypted=False)
- insertable = []
- for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
- for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
- if doc_id == next_doc_id:
- content = next_content
- insertable.append((doc_id, rev, content, gen, trans_id))
- else:
- break
- defer.returnValue(insertable)
-
- @defer.inlineCallbacks
- def _count_docs_in_sync_db(self, encrypted=None):
- """
- Count how many documents we have in the table for received docs.
-
- :param encrypted: If not None, return count of documents with
- encrypted field equal to given parameter.
- :type encrypted: bool or None
-
- :return: The count of documents.
- :rtype: int
- """
- query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
- if encrypted is not None:
- query += " WHERE encrypted = %d" % int(encrypted)
- res = yield self._sync_db.runQuery(query)
- if res:
- val = res.pop()
- defer.returnValue(val[0])
- else:
- defer.returnValue(0)
-
- @defer.inlineCallbacks
- def _decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
- """
- self._raise_in_case_of_failed_async_calls()
- docs_by_generation = yield self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ in docs_by_generation:
- self._decrypt_doc(
- doc_id, rev, content, gen, trans_id, self.source_replica_uid)
-
- @defer.inlineCallbacks
- def _process_decrypted(self):
- """
- Process the already decrypted documents, and insert as many documents
- as can be taken from the expected order without finding a gap.
-
- :return: Whether we have processed all the pending docs.
- :rtype: bool
- """
- # Acquire the lock to avoid processing while we're still
- # getting data from the syncing stream, to avoid InvalidGeneration
- # problems.
- insertable = yield self.get_insertable_docs_by_gen()
- for doc_fields in insertable:
- yield self.insert_decrypted_local_doc(*doc_fields)
-
- @defer.inlineCallbacks
- def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert the decrypted document into the local sqlcipher database.
- Makes use of the passed callback `return_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- insert_fun = self._insert_doc_cb[self.source_replica_uid]
- logger.debug("Sync decrypter pool: inserting doc in local db: "
- "%s:%s %s" % (doc_id, doc_rev, gen))
-
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- insert_fun(doc, gen, trans_id)
-
- # If no errors found, remove it from the received database.
- yield self.delete_received_doc(doc_id, doc_rev)
-
- @defer.inlineCallbacks
- def empty(self):
- """
- Empty the received docs table of the sync database.
- """
- sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- yield self._sync_db.runQuery(sql)
-
- @defer.inlineCallbacks
- def _fetchall(self, *args, **kwargs):
- results = yield self._sync_db.runQuery(*args, **kwargs)
- defer.returnValue(results)
-
- def _raise_in_case_of_failed_async_calls(self):
- """
- Re-raise any exception raised by an async call.
-
- :raise Exception: Raised if an async call has raised an exception.
- """
- for res in self._async_results:
- if res.ready():
- if not res.successful():
- # re-raise the exception raised by the remote call
- res.get()
-
- def _stop_decr_loop(self):
- """
- """
- self._stopped.set()
-
- def close(self):
- """
- """
- self._stop_decr_loop()
- SyncEncryptDecryptPool.close(self)
-
- def _decrypt_and_process_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- while not self._stopped.is_set():
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- continue
- self._decrypt_received_docs()
- self._process_decrypted()
- time.sleep(self.DECRYPT_LOOP_PERIOD)
-
- def wait(self):
- while not self.clear_to_sync():
- time.sleep(self.DECRYPT_LOOP_PERIOD)
-
- @defer.inlineCallbacks
- def clear_to_sync(self):
- count = yield self._count_docs_in_sync_db()
- defer.returnValue(count == 0)