summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py862
1 files changed, 181 insertions, 681 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 7133f804..bdbaa8e0 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -23,31 +23,14 @@ import hmac
import hashlib
import json
import logging
-import multiprocessing
-import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
-from zope.proxy import sameProxiedObjects
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
-from leap.soledad.common.document import SoledadDocument
-
-
-from leap.soledad.common.crypto import (
- EncryptionSchemes,
- UnknownEncryptionScheme,
- MacMethods,
- UnknownMacMethod,
- WrongMac,
- ENC_JSON_KEY,
- ENC_SCHEME_KEY,
- ENC_METHOD_KEY,
- ENC_IV_KEY,
- MAC_KEY,
- MAC_METHOD_KEY,
-)
+from leap.soledad.common import crypto
+
logger = logging.getLogger(__name__)
@@ -55,37 +38,23 @@ logger = logging.getLogger(__name__)
MAC_KEY_LENGTH = 64
-class EncryptionMethods(object):
- """
- Representation of encryption methods that can be used.
+def _assert_known_encryption_method(method):
"""
+ Assert that we can encrypt/decrypt the given C{method}
- AES_256_CTR = 'aes-256-ctr'
- XSALSA20 = 'xsalsa20'
-
-#
-# Exceptions
-#
-
-
-class DocumentNotEncrypted(Exception):
- """
- Raised for failures in document encryption.
- """
- pass
-
-
-class UnknownEncryptionMethod(Exception):
- """
- Raised when trying to encrypt/decrypt with unknown method.
- """
- pass
-
+ :param method: The encryption method to assert.
+ :type method: str
-class NoSymmetricSecret(Exception):
- """
- Raised when trying to get a hashed passphrase.
+ :raise UnknownEncryptionMethodError: Raised when C{method} is unknown.
"""
+ valid_methods = [
+ crypto.EncryptionMethods.AES_256_CTR,
+ crypto.EncryptionMethods.XSALSA20,
+ ]
+ try:
+ soledad_assert(method in valid_methods)
+ except AssertionError:
+ raise crypto.UnknownEncryptionMethodError
def encrypt_sym(data, key, method):
@@ -104,25 +73,26 @@ def encrypt_sym(data, key, method):
:return: A tuple with the initial value and the encrypted data.
:rtype: (long, str)
+
+ :raise AssertionError: Raised if C{method} is unknown.
"""
soledad_assert_type(key, str)
-
soledad_assert(
len(key) == 32, # 32 x 8 = 256 bits.
'Wrong key size: %s bits (must be 256 bits long).' %
(len(key) * 8))
+ _assert_known_encryption_method(method)
+
iv = None
# AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
+ if method == crypto.EncryptionMethods.AES_256_CTR:
iv = os.urandom(16)
ciphertext = AES(key=key, iv=iv).process(data)
# XSalsa20
- elif method == EncryptionMethods.XSALSA20:
+ elif method == crypto.EncryptionMethods.XSALSA20:
iv = os.urandom(24)
ciphertext = XSalsa20(key=key, iv=iv).process(data)
- else:
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
+
return binascii.b2a_base64(iv), ciphertext
@@ -143,6 +113,8 @@ def decrypt_sym(data, key, method, **kwargs):
:return: The decrypted data.
:rtype: str
+
+ :raise UnknownEncryptionMethodError: Raised when C{method} is unknown.
"""
soledad_assert_type(key, str)
# assert params
@@ -152,17 +124,15 @@ def decrypt_sym(data, key, method, **kwargs):
soledad_assert(
'iv' in kwargs,
'%s needs an initial value.' % method)
+ _assert_known_encryption_method(method)
# AES-256 in CTR mode
- if method == EncryptionMethods.AES_256_CTR:
+ if method == crypto.EncryptionMethods.AES_256_CTR:
return AES(
key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- elif method == EncryptionMethods.XSALSA20:
+ elif method == crypto.EncryptionMethods.XSALSA20:
return XSalsa20(
key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data)
- # raise if method is unknown
- raise UnknownEncryptionMethod('Unkwnown method: %s' % method)
-
def doc_mac_key(doc_id, secret):
"""
@@ -176,17 +146,13 @@ def doc_mac_key(doc_id, secret):
:param doc_id: The id of the document.
:type doc_id: str
- :param secret: soledad secret storage
- :type secret: Soledad.storage_secret
+ :param secret: The Soledad storage secret
+ :type secret: str
:return: The key.
:rtype: str
-
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
"""
- if secret is None:
- raise NoSymmetricSecret()
-
+ soledad_assert(secret is not None)
return hmac.new(
secret[:MAC_KEY_LENGTH],
doc_id,
@@ -208,11 +174,11 @@ class SoledadCrypto(object):
self._soledad = soledad
def encrypt_sym(self, data, key,
- method=EncryptionMethods.AES_256_CTR):
+ method=crypto.EncryptionMethods.AES_256_CTR):
return encrypt_sym(data, key, method)
def decrypt_sym(self, data, key,
- method=EncryptionMethods.AES_256_CTR, **kwargs):
+ method=crypto.EncryptionMethods.AES_256_CTR, **kwargs):
return decrypt_sym(data, key, method, **kwargs)
def doc_mac_key(self, doc_id, secret):
@@ -224,7 +190,7 @@ class SoledadCrypto(object):
The password is derived using HMAC having sha256 as underlying hash
function. The key used for HMAC are the first
- C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage
+ C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage
secret stripped from the first MAC_KEY_LENGTH characters. The HMAC
message is C{doc_id}.
@@ -234,15 +200,10 @@ class SoledadCrypto(object):
:return: The passphrase.
:rtype: str
-
- :raise NoSymmetricSecret: if no symmetric secret was supplied.
"""
- if self.secret is None:
- raise NoSymmetricSecret()
+ soledad_assert(self.secret is not None)
return hmac.new(
- self.secret[
- MAC_KEY_LENGTH:
- self._soledad.REMOTE_STORAGE_SECRET_LENGTH],
+ self.secret[MAC_KEY_LENGTH:],
doc_id,
hashlib.sha256).digest()
@@ -251,17 +212,18 @@ class SoledadCrypto(object):
#
def _get_secret(self):
- return self._soledad.storage_secret
+ return self._soledad.secrets.remote_storage_secret
secret = property(
_get_secret, doc='The secret used for symmetric encryption')
+
#
# Crypto utilities for a SoledadDocument.
#
-
-def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
+def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
+ mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -277,21 +239,38 @@ def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret):
:type doc_rev: str
:param ciphertext: The content of the document.
:type ciphertext: str
+ :param enc_scheme: The encryption scheme.
+ :type enc_scheme: str
+ :param enc_method: The encryption method.
+ :type enc_method: str
+ :param enc_iv: The encryption initialization vector.
+ :type enc_iv: str
:param mac_method: The MAC method to use.
:type mac_method: str
- :param secret: soledad secret
- :type secret: Soledad.secret_storage
+ :param secret: The Soledad storage secret
+ :type secret: str
:return: The calculated MAC.
:rtype: str
- """
- if mac_method == MacMethods.HMAC:
- return hmac.new(
- doc_mac_key(doc_id, secret),
- str(doc_id) + str(doc_rev) + ciphertext,
- hashlib.sha256).digest()
- # raise if we do not know how to handle this MAC method
- raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method)
+
+ :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown.
+ """
+ try:
+ soledad_assert(mac_method == crypto.MacMethods.HMAC)
+ except AssertionError:
+ raise crypto.UnknownMacMethodError
+ template = "{doc_id}{doc_rev}{ciphertext}{enc_scheme}{enc_method}{enc_iv}"
+ content = template.format(
+ doc_id=doc_id,
+ doc_rev=doc_rev,
+ ciphertext=ciphertext,
+ enc_scheme=enc_scheme,
+ enc_method=enc_method,
+ enc_iv=enc_iv)
+ return hmac.new(
+ doc_mac_key(doc_id, secret),
+ content,
+ hashlib.sha256).digest()
def encrypt_doc(crypto, doc):
@@ -319,12 +298,12 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
string representing the following:
{
- ENC_JSON_KEY: '<encrypted doc JSON string>',
- ENC_SCHEME_KEY: 'symkey',
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: '<the initial value used to encrypt>',
+ crypto.ENC_JSON_KEY: '<encrypted doc JSON string>',
+ crypto.ENC_SCHEME_KEY: 'symkey',
+ crypto.ENC_METHOD_KEY: crypto.EncryptionMethods.AES_256_CTR,
+ crypto.ENC_IV_KEY: '<the initial value used to encrypt>',
MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
+ crypto.MAC_METHOD_KEY: 'hmac'
}
:param docstr: A representation of the document to be encrypted.
@@ -339,30 +318,40 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
:param key: The key used to encrypt ``data`` (must be 256 bits long).
:type key: str
- :param secret: The Soledad secret (used for MAC auth).
+ :param secret: The Soledad storage secret (used for MAC auth).
:type secret: str
:return: The JSON serialization of the dict representing the encrypted
content.
:rtype: str
"""
- # encrypt content using AES-256 CTR mode
- iv, ciphertext = encrypt_sym(
+ enc_scheme = crypto.EncryptionSchemes.SYMKEY
+ enc_method = crypto.EncryptionMethods.AES_256_CTR
+ mac_method = crypto.MacMethods.HMAC
+ enc_iv, ciphertext = encrypt_sym(
str(docstr), # encryption/decryption routines expect str
- key, method=EncryptionMethods.AES_256_CTR)
+ key, method=enc_method)
+ mac = binascii.b2a_hex( # store the mac as hex.
+ mac_doc(
+ doc_id,
+ doc_rev,
+ ciphertext,
+ enc_scheme,
+ enc_method,
+ enc_iv,
+ mac_method,
+ secret))
# Return a representation for the encrypted content. In the following, we
# convert binary data to hexadecimal representation so the JSON
# serialization does not complain about what it tries to serialize.
hex_ciphertext = binascii.b2a_hex(ciphertext)
return json.dumps({
- ENC_JSON_KEY: hex_ciphertext,
- ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY,
- ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR,
- ENC_IV_KEY: iv,
- MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex.
- doc_id, doc_rev, ciphertext,
- MacMethods.HMAC, secret)),
- MAC_METHOD_KEY: MacMethods.HMAC,
+ crypto.ENC_JSON_KEY: hex_ciphertext,
+ crypto.ENC_SCHEME_KEY: enc_scheme,
+ crypto.ENC_METHOD_KEY: enc_method,
+ crypto.ENC_IV_KEY: enc_iv,
+ crypto.MAC_KEY: mac,
+ crypto.MAC_METHOD_KEY: mac_method,
})
@@ -384,27 +373,77 @@ def decrypt_doc(crypto, doc):
return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret)
+def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
+ enc_iv, mac_method, secret, doc_mac):
+ """
+ Verify that C{doc_mac} is a correct MAC for the given document.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+ :param ciphertext: The content of the document.
+ :type ciphertext: str
+ :param enc_scheme: The encryption scheme.
+ :type enc_scheme: str
+ :param enc_method: The encryption method.
+ :type enc_method: str
+ :param enc_iv: The encryption initialization vector.
+ :type enc_iv: str
+ :param mac_method: The MAC method to use.
+ :type mac_method: str
+ :param secret: The Soledad storage secret
+ :type secret: str
+ :param doc_mac: The MAC to be verified against.
+ :type doc_mac: str
+
+ :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown.
+ :raise crypto.WrongMacError: Raised when MAC could not be verified.
+ """
+ calculated_mac = mac_doc(
+ doc_id,
+ doc_rev,
+ ciphertext,
+ enc_scheme,
+ enc_method,
+ enc_iv,
+ mac_method,
+ secret)
+ # we compare mac's hashes to avoid possible timing attacks that might
+ # exploit python's builtin comparison operator behaviour, which fails
+ # immediatelly when non-matching bytes are found.
+ doc_mac_hash = hashlib.sha256(
+ binascii.a2b_hex( # the mac is stored as hex
+ doc_mac)).digest()
+ calculated_mac_hash = hashlib.sha256(calculated_mac).digest()
+
+ if doc_mac_hash != calculated_mac_hash:
+ logger.warning("Wrong MAC while decrypting doc...")
+ raise crypto.WrongMacError("Could not authenticate document's "
+ "contents.")
+
+
def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
"""
- Decrypt C{doc}'s content.
+ Decrypt a symmetrically encrypted C{doc}'s content.
Return the JSON string representation of the document's decrypted content.
The passed doc_dict argument should have the following structure:
{
- ENC_JSON_KEY: '<enc_blob>',
- ENC_SCHEME_KEY: '<enc_scheme>',
- ENC_METHOD_KEY: '<enc_method>',
- ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
+ crypto.ENC_JSON_KEY: '<enc_blob>',
+ crypto.ENC_SCHEME_KEY: '<enc_scheme>',
+ crypto.ENC_METHOD_KEY: '<enc_method>',
+ crypto.ENC_IV_KEY: '<initial value used to encrypt>', # (optional)
MAC_KEY: '<mac>'
- MAC_METHOD_KEY: 'hmac'
+ crypto.MAC_METHOD_KEY: 'hmac'
}
C{enc_blob} is the encryption of the JSON serialization of the document's
content. For now Soledad just deals with documents whose C{enc_scheme} is
- EncryptionSchemes.SYMKEY and C{enc_method} is
- EncryptionMethods.AES_256_CTR.
+ crypto.EncryptionSchemes.SYMKEY and C{enc_method} is
+ crypto.EncryptionMethods.AES_256_CTR.
:param doc_dict: The content of the document to be decrypted.
:type doc_dict: dict
@@ -423,48 +462,35 @@ def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
:return: The JSON serialization of the decrypted content.
:rtype: str
+
+ :raise UnknownEncryptionMethodError: Raised when trying to decrypt from an
+ unknown encryption method.
"""
- soledad_assert(ENC_JSON_KEY in doc_dict)
- soledad_assert(ENC_SCHEME_KEY in doc_dict)
- soledad_assert(ENC_METHOD_KEY in doc_dict)
- soledad_assert(MAC_KEY in doc_dict)
- soledad_assert(MAC_METHOD_KEY in doc_dict)
-
- # verify MAC
- ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc_dict[ENC_JSON_KEY])
- mac = mac_doc(
- doc_id, doc_rev,
- ciphertext,
- doc_dict[MAC_METHOD_KEY], secret)
- # we compare mac's hashes to avoid possible timing attacks that might
- # exploit python's builtin comparison operator behaviour, which fails
- # immediatelly when non-matching bytes are found.
- doc_mac_hash = hashlib.sha256(
- binascii.a2b_hex( # the mac is stored as hex
- doc_dict[MAC_KEY])).digest()
- calculated_mac_hash = hashlib.sha256(mac).digest()
+ # assert document dictionary structure
+ expected_keys = set([
+ crypto.ENC_JSON_KEY,
+ crypto.ENC_SCHEME_KEY,
+ crypto.ENC_METHOD_KEY,
+ crypto.ENC_IV_KEY,
+ crypto.MAC_KEY,
+ crypto.MAC_METHOD_KEY,
+ ])
+ soledad_assert(expected_keys.issubset(set(doc_dict.keys())))
- if doc_mac_hash != calculated_mac_hash:
- logger.warning("Wrong MAC while decrypting doc...")
- raise WrongMac('Could not authenticate document\'s contents.')
- # decrypt doc's content
- enc_scheme = doc_dict[ENC_SCHEME_KEY]
- plainjson = None
- if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc_dict[ENC_METHOD_KEY]
- if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc_dict)
- plainjson = decrypt_sym(
- ciphertext, key,
- method=enc_method,
- iv=doc_dict[ENC_IV_KEY])
- else:
- raise UnknownEncryptionMethod(enc_method)
- else:
- raise UnknownEncryptionScheme(enc_scheme)
-
- return plainjson
+ ciphertext = binascii.a2b_hex(doc_dict[crypto.ENC_JSON_KEY])
+ enc_scheme = doc_dict[crypto.ENC_SCHEME_KEY]
+ enc_method = doc_dict[crypto.ENC_METHOD_KEY]
+ enc_iv = doc_dict[crypto.ENC_IV_KEY]
+ doc_mac = doc_dict[crypto.MAC_KEY]
+ mac_method = doc_dict[crypto.MAC_METHOD_KEY]
+
+ soledad_assert(enc_scheme == crypto.EncryptionSchemes.SYMKEY)
+
+ _verify_doc_mac(
+ doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
+ enc_iv, mac_method, secret, doc_mac)
+
+ return decrypt_sym(ciphertext, key, method=enc_method, iv=enc_iv)
def is_symmetrically_encrypted(doc):
@@ -476,534 +502,8 @@ def is_symmetrically_encrypted(doc):
:rtype: bool
"""
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY:
+ if doc.content and crypto.ENC_SCHEME_KEY in doc.content:
+ if doc.content[crypto.ENC_SCHEME_KEY] \
+ == crypto.EncryptionSchemes.SYMKEY:
return True
return False
-
-
-#
-# Encrypt/decrypt pools of workers
-#
-
-class SyncEncryptDecryptPool(object):
- """
- Base class for encrypter/decrypter pools.
- """
- WORKERS = 5
-
- def __init__(self, crypto, sync_db, write_lock):
- """
- Initialize the pool of encryption-workers.
-
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
-
- :param sync_db: a database connection handle
- :type sync_db: handle
-
- :param write_lock: a write lock for controlling concurrent access
- to the sync_db
- :type write_lock: threading.Lock
- """
- self._pool = multiprocessing.Pool(self.WORKERS)
- self._crypto = crypto
- self._sync_db = sync_db
- self._sync_db_write_lock = write_lock
-
- def close(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
-
-def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
- """
- Encrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- encrypted_content = encrypt_docstr(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, encrypted_content
-
-
-class SyncEncrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric encryption
- of documents to be synced.
- """
- # TODO implement throttling to reduce cpu usage??
- WORKERS = 5
- TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id, rev, content"
-
- def encrypt_doc(self, doc, workers=True):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
- docstr = doc.get_json()
- key = self._crypto.doc_passphrase(doc.doc_id)
- secret = self._crypto.secret
- args = doc.doc_id, doc.rev, docstr, key, secret
-
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
- res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
- """
- Insert results of encryption routine into the local sync database.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, doc_rev, content = result
- self.insert_encrypted_local_doc(doc_id, doc_rev, content)
-
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
- """
- Insert the contents of the encrypted doc into the local sync
- database.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param content: The encrypted document.
- :type content: str
- """
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
-
-
-def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
- """
- Decrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification of
- that document.
- :type trans_id: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- decrypted_content = decrypt_doc_dict(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, decrypted_content, gen, trans_id
-
-
-def get_insertable_docs_by_gen(expected, got):
- """
- Return a list of documents ready to be inserted. This list is computed
- by aligning the expected list with the already gotten docs, and returning
- the maximum number of docs that can be processed in the expected order
- before finding a gap.
-
- :param expected: A list of generations to be inserted.
- :type expected: list
-
- :param got: A dictionary whose values are the docs to be inserted.
- :type got: dict
- """
- ordered = [got.get(i) for i in expected]
- if None in ordered:
- return ordered[:ordered.index(None)]
- else:
- return ordered
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. All the encrypted docs are collected, together with their generation
- and transaction-id
- 2. The docs are enqueued for decryption. When completed, they are
- inserted following the generation order.
- """
- # TODO implement throttling to reduce cpu usage??
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
-
- write_encrypted_lock = threading.Lock()
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.decrypted_docs = {}
- self.source_replica_uid = None
-
- def set_source_replica_uid(self, source_replica_uid):
- """
- Set the source replica uid for this decrypter pool instance.
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
- """
- self.source_replica_uid = source_replica_uid
-
- def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert a received message with encrypted content, to be decrypted later
- on.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- docstr = json.dumps(content)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- with con:
- con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
-
- def insert_marker_for_received_doc(self, doc_id, doc_rev, gen):
- """
- Insert a marker with the document id, revision and generation on the
- sync db. This document does not have an encrypted payload, so the
- content has already been inserted into the decrypted_docs dictionary
- from where it can be picked following generation order.
- We need to leave here the marker to be able to calculate the expected
- insertion order for a synchronization batch.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param gen: the Document Generation
- :type gen: int
- """
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- with con:
- con.execute(sql_ins, (doc_id, doc_rev, '', gen, ''))
-
- def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- # XXX this need a deeper review / testing.
- # I believe that what I'm doing here is prone to problems
- # if the sync is interrupted (ie, client crash) in the worst possible
- # moment. We would need a recover strategy in that case
- # (or, insert the document in the table all the same, but with a flag
- # saying if the document is sym-encrypted or not),
- content = json.dumps(content)
- result = doc_id, doc_rev, content, gen, trans_id
- self.decrypted_docs[gen] = result
- self.insert_marker_for_received_doc(doc_id, doc_rev, gen)
-
- def delete_encrypted_received_doc(self, doc_id, doc_rev):
- """
- Delete a encrypted received doc after it was inserted into the local
- db.
-
- :param doc_id: Document ID.
- :type doc_id: str
- :param doc_rev: Document revision.
- :type doc_rev: str
- """
- sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, doc_rev))
-
- def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True):
- """
- Symmetrically decrypt a document.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param source_replica_uid:
- :type source_replica_uid: str
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- self.source_replica_uid = source_replica_uid
-
- # insert_doc_cb is a proxy object that gets updated with the right
- # insert function only when the sync_target invokes the sync_exchange
- # method. so, if we don't still have a non-empty callback, we refuse
- # to proceed.
- if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
- None):
- logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
- return
-
- # XXX move to get_doc function...
- c = self._sync_db.cursor()
- sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- try:
- c.execute(sql, (doc_id, rev))
- res = c.fetchone()
- except Exception as exc:
- logger.warning("Error getting docs from syncdb: %r" % (exc,))
- return
- if res is None:
- logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
- return
-
- soledad_assert(self._crypto is not None, "need a crypto object")
- try:
- doc_id, rev, docstr, gen, trans_id = res
- except ValueError:
- logger.warning("Wrong entry in sync db")
- return
-
- if len(docstr) == 0:
- # not encrypted payload
- return
-
- try:
- content = json.loads(docstr)
- except TypeError:
- logger.warning("Wrong type while decoding json: %s" % repr(docstr))
- return
-
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret
-
- try:
- if workers:
- # Ouch. This is sent to the workers asynchronously, so
- # we have no way of logging errors. We'd have to inspect
- # lingering results by querying successful / get() over them...
- # Or move the heck out of it to twisted.
- res = self._pool.apply_async(
- decrypt_doc_task, args,
- callback=self.decrypt_doc_cb)
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
-
- def decrypt_doc_cb(self, result):
- """
- Temporarily store the decryption result in a dictionary where it will
- be picked by process_decrypted.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen))
- self.decrypted_docs[gen] = result
-
- def get_docs_by_generation(self):
- """
- Get all documents in the received table from the sync db,
- ordered by generation.
-
- :return: list of doc_id, rev, generation
- """
- c = self._sync_db.cursor()
- sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
- self.TABLE_NAME,)
- c.execute(sql)
- return c.fetchall()
-
- def count_received_encrypted_docs(self):
- """
- Count how many documents we have in the table for received and
- encrypted docs.
-
- :return: The count of documents.
- :rtype: int
- """
- if self._sync_db is None:
- logger.warning("cannot return count with null sync_db")
- return
- c = self._sync_db.cursor()
- sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
- c.execute(sql)
- res = c.fetchone()
- if res is not None:
- return res[0]
- else:
- return 0
-
- def decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
- """
- docs_by_generation = self.get_docs_by_generation()
- logger.debug("Sync decrypter pool: There are %d documents to " \
- "decrypt." % len(docs_by_generation))
- for doc_id, rev, gen in filter(None, docs_by_generation):
- self.decrypt_doc(doc_id, rev, self.source_replica_uid)
-
- def process_decrypted(self):
- """
- Process the already decrypted documents, and insert as many documents
- as can be taken from the expected order without finding a gap.
-
- :return: Whether we have processed all the pending docs.
- :rtype: bool
- """
- # Acquire the lock to avoid processing while we're still
- # getting data from the syncing stream, to avoid InvalidGeneration
- # problems.
- with self.write_encrypted_lock:
- already_decrypted = self.decrypted_docs
- docs = self.get_docs_by_generation()
- docs = filter(lambda entry: len(entry) > 0, docs)
- expected = [gen for doc_id, rev, gen in docs]
- docs_to_insert = get_insertable_docs_by_gen(
- expected, already_decrypted)
- for doc_fields in docs_to_insert:
- self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_received_encrypted_docs()
- return remaining == 0
-
- def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert the decrypted document into the local sqlcipher database.
- Makes use of the passed callback `return_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- insert_fun = self._insert_doc_cb[self.source_replica_uid]
- logger.debug("Sync decrypter pool: inserting doc in local db: " \
- "%s:%s %s" % (doc_id, doc_rev, gen))
- try:
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- insert_fun(doc, int(gen), trans_id)
- except Exception as exc:
- logger.error("Sync decrypter pool: error while inserting "
- "decrypted doc into local db.")
- logger.exception(exc)
-
- else:
- # If no errors found, remove it from the local temporary dict
- # and from the received database.
- self.decrypted_docs.pop(gen)
- self.delete_encrypted_received_doc(doc_id, doc_rev)