diff options
author | Kali Kaneko <kali@leap.se> | 2014-03-14 02:09:40 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-03-17 12:37:35 -0400 |
commit | d2d3a243b6da313a54c8c498ffcd3f065721ad5a (patch) | |
tree | 2e34698acdbf656e759c945bca96c3e04ca30d6a /client/src/leap/soledad/client/crypto.py | |
parent | 1a60f3616efef904917dd77a12170912defc7637 (diff) |
move symmetric decryption of docs to be db-based toofeature/enc-sync-transitional-db
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 277 |
1 files changed, 247 insertions, 30 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 2ada4937..6d1fab37 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # crypto.py -# Copyright (C) 2013,2014 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,12 +24,15 @@ import hashlib import json import logging import multiprocessing +import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument from leap.soledad.common.crypto import ( @@ -346,14 +349,13 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): }) -# XXX change to docstr... -def decrypt_doc(crypto, doc): +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): """ Decrypt C{doc}'s content. Return the JSON string representation of the document's decrypted content. - The content of the document should have the following structure: + The passed doc_dict argument should have the following structure: { ENC_JSON_KEY: '<enc_blob>', @@ -369,52 +371,67 @@ def decrypt_doc(crypto, doc): EncryptionSchemes.SYMKEY and C{enc_method} is EncryptionMethods.AES_256_CTR. - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document to be decrypted. - :type doc: SoledadDocument + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: + :type secret: :return: The JSON serialization of the decrypted content. :rtype: str """ - soledad_assert(doc.is_tombstone() is False) - soledad_assert(ENC_JSON_KEY in doc.content) - soledad_assert(ENC_SCHEME_KEY in doc.content) - soledad_assert(ENC_METHOD_KEY in doc.content) - soledad_assert(MAC_KEY in doc.content) - soledad_assert(MAC_METHOD_KEY in doc.content) + # TODO where should we move these assertions, now that we're passed the + # string? + #soledad_assert(doc.is_tombstone() is False) + + soledad_assert(ENC_JSON_KEY in doc_dict) + soledad_assert(ENC_SCHEME_KEY in doc_dict) + soledad_assert(ENC_METHOD_KEY in doc_dict) + soledad_assert(MAC_KEY in doc_dict) + soledad_assert(MAC_METHOD_KEY in doc_dict) + # verify MAC ciphertext = binascii.a2b_hex( # content is stored as hex. - doc.content[ENC_JSON_KEY]) + doc_dict[ENC_JSON_KEY]) mac = mac_doc( - doc.doc_id, doc.rev, + doc_id, doc_rev, ciphertext, - doc.content[MAC_METHOD_KEY], crypto.secret) + doc_dict[MAC_METHOD_KEY], secret) # we compare mac's hashes to avoid possible timing attacks that might # exploit python's builtin comparison operator behaviour, which fails # immediatelly when non-matching bytes are found. doc_mac_hash = hashlib.sha256( binascii.a2b_hex( # the mac is stored as hex - doc.content[MAC_KEY])).digest() + doc_dict[MAC_KEY])).digest() calculated_mac_hash = hashlib.sha256(mac).digest() if doc_mac_hash != calculated_mac_hash: raise WrongMac('Could not authenticate document\'s contents.') # decrypt doc's content - enc_scheme = doc.content[ENC_SCHEME_KEY] + enc_scheme = doc_dict[ENC_SCHEME_KEY] plainjson = None if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc.content[ENC_METHOD_KEY] + enc_method = doc_dict[ENC_METHOD_KEY] if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc.content) - plainjson = crypto.decrypt_sym( - ciphertext, - crypto.doc_passphrase(doc.doc_id), + soledad_assert(ENC_IV_KEY in doc_dict) + plainjson = decrypt_sym( + ciphertext, key, method=enc_method, - iv=doc.content[ENC_IV_KEY]) + iv=doc_dict[ENC_IV_KEY]) else: raise UnknownEncryptionMethod(enc_method) else: raise UnknownEncryptionScheme(enc_scheme) + + print "PLAIN: ", plainjson return plainjson @@ -451,6 +468,9 @@ class SyncEncryptDecryptPool(object): :param sync_db: a database connection handle :type sync_db: handle + + :param insert_doc_cb: Optional callback for inserting doc. + :type insert_doc_cb: callable """ self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto @@ -492,9 +512,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): def encrypt_doc_cb(self, result): doc_id, doc_rev, content = result - self.insert_encrypted_doc(doc_id, doc_rev, content) + self.insert_encrypted_local_doc(doc_id, doc_rev, content) - def insert_encrypted_doc(self, doc_id, doc_rev, content): + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync database. @@ -512,19 +532,216 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_db.commit() +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): + decrypted_content = decrypt_doc_dict( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): + """ + Return a list of documents ready to be inserted. This list is computed + by aligning the expected list with the already gotten docs, and returning + the maximum number of docs that can be processed in the expected order + before finding a gap. + + :param expected: A list of generations to be inserted. + :type expected: list + + :param got: A dictionary whose values are the docs to be inserted. + :type got: dict + """ + ordered = [got.get(i) for i in expected] + if None in ordered: + return ordered[:ordered.index(None)] + else: + return ordered + + class SyncDecrypterPool(SyncEncryptDecryptPool): """ Pool of workers that spawn subprocesses to execute the symmetric decryption of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. All the encrypted docs are collected, together with their generation + and transaction-id + 2. The docs are enqueued for decryption. When completed, they are + inserted following the generation order. """ WORKERS = 10 TABLE_NAME = "docs_received" FIELD_NAMES = "doc_id, rev, content, gen, trans_id" - def decrypt_doc(self, doc_id, rev): + write_encrypted_lock = threading.Lock() + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + SyncEncryptDecryptPool.__init__(self, *args) + self.decrypted_docs = {} + + def insert_encrypted_received_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert a received message with encrypted content, to be decrypted later + on. + """ + docstr = json.dumps(content) + c = self._sync_db.cursor() + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + c.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + self._sync_db.commit() + + def delete_encrypted_received_doc(self, doc_id, doc_rev): + """ + Delete a encrypted received doc after it was inserted into the local + db. + + :param doc_id: Document ID. + :type doc_id: str + :param doc_rev: Document revision. + :type doc_rev: str + """ + c = self._sync_db.cursor() + sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + c.execute(sql_del, (doc_id, doc_rev)) + self._sync_db.commit() + + def decrypt_doc(self, doc_id, rev, source_replica_uid): """ Symmetrically decrypt a document. - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param source_replica_uid: + :type source_replica_uid: str + """ + self.source_replica_uid = source_replica_uid + if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), + None): + print self._insert_doc_cb + logger.warning("No insert_doc callback, skipping decryption.") + return + + # XXX move to get_doc function... + c = self._sync_db.cursor() + sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + c.execute(sql, (doc_id, rev)) + res = c.fetchone() + if res is None: + logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) + return + + doc_id, rev, docstr, gen, trans_id = res + content = json.loads(docstr) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + + args = doc_id, rev, content, gen, trans_id, key, secret + + try: + self._pool.apply_async(decrypt_doc_task, args, + callback=self.decrypt_doc_cb) + except Exception as exc: + logger.exception(exc) + + def decrypt_doc_cb(self, result): + """ + Temporarily store the decryption result in a dictionary where it will + be picked by process_decrypted. + + :param result: the result of the decryption routine. + :type result: tuple + """ + doc_id, rev, content, gen, trans_id = result + self.decrypted_docs[gen] = result + + def get_docs_by_generation(self): + """ + Get all documents in the received table from the sync db, + ordered by generation. + + :return: list of doc_id, rev, generation + """ + c = self._sync_db.cursor() + sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( + self.TABLE_NAME,) + c.execute(sql) + return c.fetchall() + + def count_received_encrypted_docs(self): + """ + Count how many documents we have in the table for received and + encrypted docs. + + :return: The count of documents. + :rtype: int + """ + c = self._sync_db.cursor() + sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + c.execute(sql) + res = c.fetchone() + print "res" + if res is not None: + print ">>>>>>>>>> GOT %s received encrypted docs" % res[0] + return res[0] + else: + return 0 + + def decrypt_received_docs(self, source_replica_uid): """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + """ + docs_by_generation = self.get_docs_by_generation() + for doc_id, rev, gen in docs_by_generation: + self.decrypt_doc(doc_id, rev, source_replica_uid) + + def process_decrypted(self): + """ + Process the already decrypted documents, and insert as many documents + as can be taken from the expected order without finding a gap. + """ + # Acquire the lock to avoid processing while we're still + # getting data from the syncing stream, to avoid InvalidGeneration + # problems. + with self.write_encrypted_lock: + docs = self.get_docs_by_generation() + expected = [gen for doc_id, rev, gen in docs] + docs_to_insert = get_insertable_docs_by_gen( + expected, self.decrypted_docs) + for doc_fields in docs_to_insert: + self.insert_decrypted_local_doc(*doc_fields) + + def insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + """ + print "TRY TO INSERT GEN --->", gen + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + try: + doc = SoledadDocument(doc_id, doc_rev, content) + insert_fun(doc, int(gen), trans_id) + except Exception as exc: + logger.error("Error while inserting decrypted doc into local db") + logger.exception(exc) + else: + # If no errors found, remove it from the local temporary dict + # and from the received database. + self.decrypted_docs.pop(gen) + self.delete_encrypted_received_doc(doc_id, doc_rev) |