summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py277
1 files changed, 247 insertions, 30 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 2ada4937..6d1fab37 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013,2014 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -24,12 +24,15 @@ import hashlib
import json
import logging
import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.crypto import (
@@ -346,14 +349,13 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
})
-# XXX change to docstr...
-def decrypt_doc(crypto, doc):
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
"""
Decrypt C{doc}'s content.
Return the JSON string representation of the document's decrypted content.
- The content of the document should have the following structure:
+ The passed doc_dict argument should have the following structure:
{
ENC_JSON_KEY: '<enc_blob>',
@@ -369,52 +371,67 @@ def decrypt_doc(crypto, doc):
EncryptionSchemes.SYMKEY and C{enc_method} is
EncryptionMethods.AES_256_CTR.
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document to be decrypted.
- :type doc: SoledadDocument
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
:return: The JSON serialization of the decrypted content.
:rtype: str
"""
- soledad_assert(doc.is_tombstone() is False)
- soledad_assert(ENC_JSON_KEY in doc.content)
- soledad_assert(ENC_SCHEME_KEY in doc.content)
- soledad_assert(ENC_METHOD_KEY in doc.content)
- soledad_assert(MAC_KEY in doc.content)
- soledad_assert(MAC_METHOD_KEY in doc.content)
+ # TODO where should we move these assertions, now that we're passed the
+ # string?
+ #soledad_assert(doc.is_tombstone() is False)
+
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
# verify MAC
ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc.content[ENC_JSON_KEY])
+ doc_dict[ENC_JSON_KEY])
mac = mac_doc(
- doc.doc_id, doc.rev,
+ doc_id, doc_rev,
ciphertext,
- doc.content[MAC_METHOD_KEY], crypto.secret)
+ doc_dict[MAC_METHOD_KEY], secret)
# we compare mac's hashes to avoid possible timing attacks that might
# exploit python's builtin comparison operator behaviour, which fails
# immediatelly when non-matching bytes are found.
doc_mac_hash = hashlib.sha256(
binascii.a2b_hex( # the mac is stored as hex
- doc.content[MAC_KEY])).digest()
+ doc_dict[MAC_KEY])).digest()
calculated_mac_hash = hashlib.sha256(mac).digest()
if doc_mac_hash != calculated_mac_hash:
raise WrongMac('Could not authenticate document\'s contents.')
# decrypt doc's content
- enc_scheme = doc.content[ENC_SCHEME_KEY]
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
plainjson = None
if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc.content[ENC_METHOD_KEY]
+ enc_method = doc_dict[ENC_METHOD_KEY]
if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc.content)
- plainjson = crypto.decrypt_sym(
- ciphertext,
- crypto.doc_passphrase(doc.doc_id),
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
method=enc_method,
- iv=doc.content[ENC_IV_KEY])
+ iv=doc_dict[ENC_IV_KEY])
else:
raise UnknownEncryptionMethod(enc_method)
else:
raise UnknownEncryptionScheme(enc_scheme)
+
+ print "PLAIN: ", plainjson
return plainjson
@@ -451,6 +468,9 @@ class SyncEncryptDecryptPool(object):
:param sync_db: a database connection handle
:type sync_db: handle
+
+ :param insert_doc_cb: Optional callback for inserting doc.
+ :type insert_doc_cb: callable
"""
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
@@ -492,9 +512,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
def encrypt_doc_cb(self, result):
doc_id, doc_rev, content = result
- self.insert_encrypted_doc(doc_id, doc_rev, content)
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- def insert_encrypted_doc(self, doc_id, doc_rev, content):
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
@@ -512,19 +532,216 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_db.commit()
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Pool of workers that spawn subprocesses to execute the symmetric decryption
of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
"""
WORKERS = 10
TABLE_NAME = "docs_received"
FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
- def decrypt_doc(self, doc_id, rev):
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args)
+ self.decrypted_docs = {}
+
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+ """
+ docstr = json.dumps(content)
+ c = self._sync_db.cursor()
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ c.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+ self._sync_db.commit()
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ c = self._sync_db.cursor()
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ c.execute(sql_del, (doc_id, doc_rev))
+ self._sync_db.commit()
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid):
"""
Symmetrically decrypt a document.
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ print self._insert_doc_cb
+ logger.warning("No insert_doc callback, skipping decryption.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ doc_id, rev, docstr, gen, trans_id = res
+ content = json.loads(docstr)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ self._pool.apply_async(decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: the result of the decryption routine.
+ :type result: tuple
+ """
+ doc_id, rev, content, gen, trans_id = result
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ print "res"
+ if res is not None:
+ print ">>>>>>>>>> GOT %s received encrypted docs" % res[0]
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self, source_replica_uid):
"""
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ for doc_id, rev, gen in docs_by_generation:
+ self.decrypt_doc(doc_id, rev, source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ docs = self.get_docs_by_generation()
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, self.decrypted_docs)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+ """
+ print "TRY TO INSERT GEN --->", gen
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ try:
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Error while inserting decrypted doc into local db")
+ logger.exception(exc)
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)