summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py1006
1 files changed, 901 insertions, 105 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index a6372107..7133f804 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,27 +14,45 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
Cryptographic utilities for Soledad.
"""
-
-
import os
import binascii
import hmac
import hashlib
-
+import json
+import logging
+import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
+
+from leap.soledad.common import soledad_assert
+from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
+
+
+from leap.soledad.common.crypto import (
+ EncryptionSchemes,
+ UnknownEncryptionScheme,
+ MacMethods,
+ UnknownMacMethod,
+ WrongMac,
+ ENC_JSON_KEY,
+ ENC_SCHEME_KEY,
+ ENC_METHOD_KEY,
+ ENC_IV_KEY,
+ MAC_KEY,
+ MAC_METHOD_KEY,
+)
+logger = logging.getLogger(__name__)
-from leap.soledad.common import (
- soledad_assert,
- soledad_assert_type,
-)
+
+MAC_KEY_LENGTH = 64
class EncryptionMethods(object):
@@ -45,6 +63,17 @@ class EncryptionMethods(object):
AES_256_CTR = 'aes-256-ctr'
XSALSA20 = 'xsalsa20'
+#
+# Exceptions
+#
+
+
+class DocumentNotEncrypted(Exception):
+ """
+ Raised for failures in document encryption.
+ """
+ pass
+
class UnknownEncryptionMethod(Exception):
"""
@@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):
"""
-class SoledadCrypto(object):
+def encrypt_sym(data, key, method):
"""
- General cryptographic functionality.
+ Encrypt C{data} using a {password}.
+
+ Currently, the only encryption methods supported are AES-256 in CTR
+ mode and XSalsa20.
+
+ :param data: The data to be encrypted.
+ :type data: str
+ :param key: The key used to encrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+
+ :return: A tuple with the initial value and the encrypted data.
+ :rtype: (long, str)
+ """
+ soledad_assert_type(key, str)
+
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s bits (must be 256 bits long).' %
+ (len(key) * 8))
+ iv = None
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ iv = os.urandom(16)
+ ciphertext = AES(key=key, iv=iv).process(data)
+ # XSalsa20
+ elif method == EncryptionMethods.XSALSA20:
+ iv = os.urandom(24)
+ ciphertext = XSalsa20(key=key, iv=iv).process(data)
+ else:
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ return binascii.b2a_base64(iv), ciphertext
+
+
+def decrypt_sym(data, key, method, **kwargs):
"""
+ Decrypt data using symmetric secret.
+
+ Currently, the only encryption method supported is AES-256 CTR mode.
- MAC_KEY_LENGTH = 64
+ :param data: The data to be decrypted.
+ :type data: str
+ :param key: The key used to decrypt C{data} (must be 256 bits long).
+ :type key: str
+ :param method: The encryption method to use.
+ :type method: str
+ :param kwargs: Other parameters specific to each encryption method.
+ :type kwargs: dict
+
+ :return: The decrypted data.
+ :rtype: str
+ """
+ soledad_assert_type(key, str)
+ # assert params
+ soledad_assert(
+ len(key) == 32, # 32 x 8 = 256 bits.
+ 'Wrong key size: %s (must be 256 bits long).' % len(key))
+ soledad_assert(
+ 'iv' in kwargs,
+ '%s needs an initial value.' % method)
+ # AES-256 in CTR mode
+ if method == EncryptionMethods.AES_256_CTR:
+ return AES(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+ elif method == EncryptionMethods.XSALSA20:
+ return XSalsa20(
+ key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
+
+ # raise if method is unknown
+ raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+
+
+def doc_mac_key(doc_id, secret):
+ """
+ Generate a key for calculating a MAC for a document whose id is
+ C{doc_id}.
+ The key is derived using HMAC having sha256 as underlying hash
+ function. The key used for HMAC is the first MAC_KEY_LENGTH characters
+ of Soledad's storage secret. The HMAC message is C{doc_id}.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+
+ :param secret: soledad secret storage
+ :type secret: Soledad.storage_secret
+
+ :return: The key.
+ :rtype: str
+
+ :raise NoSymmetricSecret: if no symmetric secret was supplied.
+ """
+ if secret is None:
+ raise NoSymmetricSecret()
+
+ return hmac.new(
+ secret[:MAC_KEY_LENGTH],
+ doc_id,
+ hashlib.sha256).digest()
+
+
+class SoledadCrypto(object):
+ """
+ General cryptographic functionality encapsulated in a
+ object that can be passed along.
+ """
def __init__(self, soledad):
"""
Initialize the crypto object.
@@ -77,78 +209,14 @@ class SoledadCrypto(object):
def encrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR):
- """
- Encrypt C{data} using a {password}.
-
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be encrypted.
- :type data: str
- :param key: The key used to encrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
-
- :return: A tuple with the initial value and the encrypted data.
- :rtype: (long, str)
- """
- soledad_assert_type(key, str)
-
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s bits (must be 256 bits long).' %
- (len(key) * 8))
- iv = None
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- iv = os.urandom(16)
- ciphertext = AES(key=key, iv=iv).process(data)
- # XSalsa20
- elif method == EncryptionMethods.XSALSA20:
- iv = os.urandom(24)
- ciphertext = XSalsa20(key=key, iv=iv).process(data)
- else:
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
- return binascii.b2a_base64(iv), ciphertext
+ return encrypt_sym(data, key, method)
def decrypt_sym(self, data, key,
method=EncryptionMethods.AES_256_CTR, **kwargs):
- """
- Decrypt data using symmetric secret.
+ return decrypt_sym(data, key, method, **kwargs)
- Currently, the only encryption method supported is AES-256 CTR mode.
-
- :param data: The data to be decrypted.
- :type data: str
- :param key: The key used to decrypt C{data} (must be 256 bits long).
- :type key: str
- :param method: The encryption method to use.
- :type method: str
- :param kwargs: Other parameters specific to each encryption method.
- :type kwargs: dict
-
- :return: The decrypted data.
- :rtype: str
- """
- soledad_assert_type(key, str)
- # assert params
- soledad_assert(
- len(key) == 32, # 32 x 8 = 256 bits.
- 'Wrong key size: %s (must be 256 bits long).' % len(key))
- soledad_assert(
- 'iv' in kwargs,
- '%s needs an initial value.' % method)
- # AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
- return AES(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- elif method == EncryptionMethods.XSALSA20:
- return XSalsa20(
- key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
-
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+ def doc_mac_key(self, doc_id, secret):
+ return doc_mac_key(doc_id, self.secret)
def doc_passphrase(self, doc_id):
"""
@@ -173,41 +241,769 @@ class SoledadCrypto(object):
raise NoSymmetricSecret()
return hmac.new(
self.secret[
- self.MAC_KEY_LENGTH:
+ MAC_KEY_LENGTH:
self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
doc_id,
hashlib.sha256).digest()
- def doc_mac_key(self, doc_id):
+ #
+ # secret setters/getters
+ #
+
+ def _get_secret(self):
+ return self._soledad.storage_secret
+
+ secret = property(
+ _get_secret, doc='The secret used for symmetric encryption')
+
+#
+# Crypto utilities for a SoledadDocument.
+#
+
+
+def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
+ """
+ Calculate a MAC for C{doc} using C{ciphertext}.
+
+ Current MAC method used is HMAC, with the following parameters:
+
+ * key: sha256(storage_secret, doc_id)
+ * msg: doc_id + doc_rev + ciphertext
+ * digestmod: sha256
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: soledad secret
+ :type secret: Soledad.secret_storage
+
+ :return: The calculated MAC.
+ :rtype: str
+ """
+ if mac_method == MacMethods.HMAC:
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ str(doc_id) + str(doc_rev) + ciphertext,
+ hashlib.sha256).digest()
+ # raise if we do not know how to handle this MAC method
+ raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+
+
+def encrypt_doc(crypto, doc):
+ """
+ Wrapper around encrypt_docstr that accepts a crypto object and the document
+ as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+
+ return encrypt_docstr(
+ doc.get_json(), doc.doc_id, doc.rev, key, secret)
+
+
+def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
+ """
+ Encrypt C{doc}'s content.
+
+ Encrypt doc's contents using AES-256 CTR mode and return a valid JSON
+ string representing the following:
+
+ {
+ ENC_JSON_KEY: '<encrypted doc JSON string>',
+ ENC_SCHEME_KEY: 'symkey',
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: '<the initial value used to encrypt>',
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ :param docstr: A representation of the document to be encrypted.
+ :type docstr: str or unicode.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: The JSON serialization of the dict representing the encrypted
+ content.
+ :rtype: str
+ """
+ # encrypt content using AES-256 CTR mode
+ iv, ciphertext = encrypt_sym(
+ str(docstr), # encryption/decryption routines expect str
+ key, method=EncryptionMethods.AES_256_CTR)
+ # Return a representation for the encrypted content. In the following, we
+ # convert binary data to hexadecimal representation so the JSON
+ # serialization does not complain about what it tries to serialize.
+ hex_ciphertext = binascii.b2a_hex(ciphertext)
+ return json.dumps({
+ ENC_JSON_KEY: hex_ciphertext,
+ ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
+ ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
+ ENC_IV_KEY: iv,
+ MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex.
+ doc_id, doc_rev, ciphertext,
+ MacMethods.HMAC, secret)),
+ MAC_METHOD_KEY: MacMethods.HMAC,
+ })
+
+
+def decrypt_doc(crypto, doc):
+ """
+ Wrapper around decrypt_doc_dict that accepts a crypto object and the
+ document as arguments.
+
+ :param crypto: a soledad crypto object.
+ :type crypto: SoledadCrypto
+ :param doc: the document.
+ :type doc: SoledadDocument
+
+ :return: json string with the decrypted document
+ :rtype: str
+ """
+ key = crypto.doc_passphrase(doc.doc_id)
+ secret = crypto.secret
+ return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret)
+
+
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
+ """
+ Decrypt C{doc}'s content.
+
+ Return the JSON string representation of the document's decrypted content.
+
+ The passed doc_dict argument should have the following structure:
+
+ {
+ ENC_JSON_KEY: '<enc_blob>',
+ ENC_SCHEME_KEY: '<enc_scheme>',
+ ENC_METHOD_KEY: '<enc_method>',
+ ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ MAC_KEY: '<mac>'
+ MAC_METHOD_KEY: 'hmac'
+ }
+
+ C{enc_blob} is the encryption of the JSON serialization of the document's
+ content. For now Soledad just deals with documents whose C{enc_scheme} is
+ EncryptionSchemes.SYMKEY and C{enc_method} is
+ EncryptionMethods.AES_256_CTR.
+
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
+
+ :return: The JSON serialization of the decrypted content.
+ :rtype: str
+ """
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
+ # verify MAC
+ ciphertext = binascii.a2b_hex( # content is stored as hex.
+ doc_dict[ENC_JSON_KEY])
+ mac = mac_doc(
+ doc_id, doc_rev,
+ ciphertext,
+ doc_dict[MAC_METHOD_KEY], secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_dict[MAC_KEY])).digest()
+ calculated_mac_hash = hashlib.sha256(mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warning("Wrong MAC while decrypting doc...")
+ raise WrongMac('Could not authenticate document\'s contents.')
+ # decrypt doc's content
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
+ plainjson = None
+ if enc_scheme == EncryptionSchemes.SYMKEY:
+ enc_method = doc_dict[ENC_METHOD_KEY]
+ if enc_method == EncryptionMethods.AES_256_CTR:
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
+ method=enc_method,
+ iv=doc_dict[ENC_IV_KEY])
+ else:
+ raise UnknownEncryptionMethod(enc_method)
+ else:
+ raise UnknownEncryptionScheme(enc_scheme)
+
+ return plainjson
+
+
+def is_symmetrically_encrypted(doc):
+ """
+ Return True if the document was symmetrically encrypted.
+
+ :param doc: The document to check.
+ :type doc: SoledadDocument
+
+ :rtype: bool
+ """
+ if doc.content and ENC_SCHEME_KEY in doc.content:
+ if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY:
+ return True
+ return False
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = 5
+
+ def __init__(self, crypto, sync_db, write_lock):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: a database connection handle
+ :type sync_db: handle
+
+ :param write_lock: a write lock for controlling concurrent access
+ to the sync_db
+ :type write_lock: threading.Lock
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_db_write_lock = write_lock
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = 5
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
"""
- Generate a key for calculating a MAC for a document whose id is
- C{doc_id}.
+ Insert results of encryption routine into the local sync database.
- The key is derived using HMAC having sha256 as underlying hash
- function. The key used for HMAC is the first MAC_KEY_LENGTH characters
- of Soledad's storage secret. The HMAC message is C{doc_id}.
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- :param doc_id: The id of the document.
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
:type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param content: The encrypted document.
+ :type content: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
- :return: The key.
- :rtype: str
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
+
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
"""
- if self.secret is None:
- raise NoSymmetricSecret()
- return hmac.new(
- self.secret[:self.MAC_KEY_LENGTH],
- doc_id,
- hashlib.sha256).digest()
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+ self.decrypted_docs = {}
+ self.source_replica_uid = None
- #
- # secret setters/getters
- #
+ def set_source_replica_uid(self, source_replica_uid):
+ """
+ Set the source replica uid for this decrypter pool instance.
- def _get_secret(self):
- return self._soledad.storage_secret
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
- secret = property(
- _get_secret, doc='The secret used for symmetric encryption')
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ docstr = json.dumps(content)
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+
+ def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
+ """
+ Insert a marker with the document id, revision and generation on the
+ sync db. This document does not have an encrypted payload, so the
+ content has already been inserted into the decrypted_docs dictionary
+ from where it can be picked following generation order.
+ We need to leave here the marker to be able to calculate the expected
+ insertion order for a synchronization batch.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param gen: the Document Generation
+ :type gen: int
+ """
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
+
+ def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ """
+ # XXX this need a deeper review / testing.
+ # I believe that what I'm doing here is prone to problems
+ # if the sync is interrupted (ie, client crash) in the worst possible
+ # moment. We would need a recover strategy in that case
+ # (or, insert the document in the table all the same, but with a flag
+ # saying if the document is sym-encrypted or not),
+ content = json.dumps(content)
+ result = doc_id, doc_rev, content, gen, trans_id
+ self.decrypted_docs[gen] = result
+ self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ con = self._sync_db
+ with self._sync_db_write_lock:
+ with con:
+ con.execute(sql_del, (doc_id, doc_rev))
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
+ """
+ Symmetrically decrypt a document.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ self.source_replica_uid = source_replica_uid
+
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ try:
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ except Exception as exc:
+ logger.warning("Error getting docs from syncdb: %r" % (exc,))
+ return
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ try:
+ doc_id, rev, docstr, gen, trans_id = res
+ except ValueError:
+ logger.warning("Wrong entry in sync db")
+ return
+
+ if len(docstr) == 0:
+ # not encrypted payload
+ return
+
+ try:
+ content = json.loads(docstr)
+ except TypeError:
+ logger.warning("Wrong type while decoding json: %s" % repr(docstr))
+ return
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ if workers:
+ # Ouch. This is sent to the workers asynchronously, so
+ # we have no way of logging errors. We'd have to inspect
+ # lingering results by querying successful / get() over them...
+ # Or move the heck out of it to twisted.
+ res = self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ self.decrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, rev, content, gen, trans_id = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ if self._sync_db is None:
+ logger.warning("cannot return count with null sync_db")
+ return
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ if res is not None:
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ logger.debug("Sync decrypter pool: There are %d documents to " \
+ "decrypt." % len(docs_by_generation))
+ for doc_id, rev, gen in filter(None, docs_by_generation):
+ self.decrypt_doc(doc_id, rev, self.source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+
+ :return: Whether we have processed all the pending docs.
+ :rtype: bool
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ already_decrypted = self.decrypted_docs
+ docs = self.get_docs_by_generation()
+ docs = filter(lambda entry: len(entry) > 0, docs)
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, already_decrypted)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+ remaining = self.count_received_encrypted_docs()
+ return remaining == 0
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: " \
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+ try:
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Sync decrypter pool: error while inserting "
+ "decrypted doc into local db.")
+ logger.exception(exc)
+
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)