diff options
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 862 |
1 files changed, 181 insertions, 681 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 7133f804..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,31 +23,14 @@ import hmac import hashlib import json import logging -import multiprocessing -import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type -from leap.soledad.common.document import SoledadDocument - - -from leap.soledad.common.crypto import ( - EncryptionSchemes, - UnknownEncryptionScheme, - MacMethods, - UnknownMacMethod, - WrongMac, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - ENC_METHOD_KEY, - ENC_IV_KEY, - MAC_KEY, - MAC_METHOD_KEY, -) +from leap.soledad.common import crypto + logger = logging.getLogger(__name__) @@ -55,37 +38,23 @@ logger = logging.getLogger(__name__) MAC_KEY_LENGTH = 64 -class EncryptionMethods(object): - """ - Representation of encryption methods that can be used. +def _assert_known_encryption_method(method): """ + Assert that we can encrypt/decrypt the given C{method} - AES_256_CTR = 'aes-256-ctr' - XSALSA20 = 'xsalsa20' - -# -# Exceptions -# - - -class DocumentNotEncrypted(Exception): - """ - Raised for failures in document encryption. - """ - pass - - -class UnknownEncryptionMethod(Exception): - """ - Raised when trying to encrypt/decrypt with unknown method. - """ - pass - + :param method: The encryption method to assert. + :type method: str -class NoSymmetricSecret(Exception): - """ - Raised when trying to get a hashed passphrase. + :raise UnknownEncryptionMethodError: Raised when C{method} is unknown. """ + valid_methods = [ + crypto.EncryptionMethods.AES_256_CTR, + crypto.EncryptionMethods.XSALSA20, + ] + try: + soledad_assert(method in valid_methods) + except AssertionError: + raise crypto.UnknownEncryptionMethodError def encrypt_sym(data, key, method): @@ -104,25 +73,26 @@ def encrypt_sym(data, key, method): :return: A tuple with the initial value and the encrypted data. :rtype: (long, str) + + :raise AssertionError: Raised if C{method} is unknown. """ soledad_assert_type(key, str) - soledad_assert( len(key) == 32, # 32 x 8 = 256 bits. 'Wrong key size: %s bits (must be 256 bits long).' % (len(key) * 8)) + _assert_known_encryption_method(method) + iv = None # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: + if method == crypto.EncryptionMethods.AES_256_CTR: iv = os.urandom(16) ciphertext = AES(key=key, iv=iv).process(data) # XSalsa20 - elif method == EncryptionMethods.XSALSA20: + elif method == crypto.EncryptionMethods.XSALSA20: iv = os.urandom(24) ciphertext = XSalsa20(key=key, iv=iv).process(data) - else: - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + return binascii.b2a_base64(iv), ciphertext @@ -143,6 +113,8 @@ def decrypt_sym(data, key, method, **kwargs): :return: The decrypted data. :rtype: str + + :raise UnknownEncryptionMethodError: Raised when C{method} is unknown. """ soledad_assert_type(key, str) # assert params @@ -152,17 +124,15 @@ def decrypt_sym(data, key, method, **kwargs): soledad_assert( 'iv' in kwargs, '%s needs an initial value.' % method) + _assert_known_encryption_method(method) # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: + if method == crypto.EncryptionMethods.AES_256_CTR: return AES( key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - elif method == EncryptionMethods.XSALSA20: + elif method == crypto.EncryptionMethods.XSALSA20: return XSalsa20( key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) - def doc_mac_key(doc_id, secret): """ @@ -176,17 +146,13 @@ def doc_mac_key(doc_id, secret): :param doc_id: The id of the document. :type doc_id: str - :param secret: soledad secret storage - :type secret: Soledad.storage_secret + :param secret: The Soledad storage secret + :type secret: str :return: The key. :rtype: str - - :raise NoSymmetricSecret: if no symmetric secret was supplied. """ - if secret is None: - raise NoSymmetricSecret() - + soledad_assert(secret is not None) return hmac.new( secret[:MAC_KEY_LENGTH], doc_id, @@ -208,11 +174,11 @@ class SoledadCrypto(object): self._soledad = soledad def encrypt_sym(self, data, key, - method=EncryptionMethods.AES_256_CTR): + method=crypto.EncryptionMethods.AES_256_CTR): return encrypt_sym(data, key, method) def decrypt_sym(self, data, key, - method=EncryptionMethods.AES_256_CTR, **kwargs): + method=crypto.EncryptionMethods.AES_256_CTR, **kwargs): return decrypt_sym(data, key, method, **kwargs) def doc_mac_key(self, doc_id, secret): @@ -224,7 +190,7 @@ class SoledadCrypto(object): The password is derived using HMAC having sha256 as underlying hash function. The key used for HMAC are the first - C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage + C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage secret stripped from the first MAC_KEY_LENGTH characters. The HMAC message is C{doc_id}. @@ -234,15 +200,10 @@ class SoledadCrypto(object): :return: The passphrase. :rtype: str - - :raise NoSymmetricSecret: if no symmetric secret was supplied. """ - if self.secret is None: - raise NoSymmetricSecret() + soledad_assert(self.secret is not None) return hmac.new( - self.secret[ - MAC_KEY_LENGTH: - self._soledad.REMOTE_STORAGE_SECRET_LENGTH], + self.secret[MAC_KEY_LENGTH:], doc_id, hashlib.sha256).digest() @@ -251,17 +212,18 @@ class SoledadCrypto(object): # def _get_secret(self): - return self._soledad.storage_secret + return self._soledad.secrets.remote_storage_secret secret = property( _get_secret, doc='The secret used for symmetric encryption') + # # Crypto utilities for a SoledadDocument. # - -def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): +def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -277,21 +239,38 @@ def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): :type doc_rev: str :param ciphertext: The content of the document. :type ciphertext: str + :param enc_scheme: The encryption scheme. + :type enc_scheme: str + :param enc_method: The encryption method. + :type enc_method: str + :param enc_iv: The encryption initialization vector. + :type enc_iv: str :param mac_method: The MAC method to use. :type mac_method: str - :param secret: soledad secret - :type secret: Soledad.secret_storage + :param secret: The Soledad storage secret + :type secret: str :return: The calculated MAC. :rtype: str - """ - if mac_method == MacMethods.HMAC: - return hmac.new( - doc_mac_key(doc_id, secret), - str(doc_id) + str(doc_rev) + ciphertext, - hashlib.sha256).digest() - # raise if we do not know how to handle this MAC method - raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown. + """ + try: + soledad_assert(mac_method == crypto.MacMethods.HMAC) + except AssertionError: + raise crypto.UnknownMacMethodError + template = "{doc_id}{doc_rev}{ciphertext}{enc_scheme}{enc_method}{enc_iv}" + content = template.format( + doc_id=doc_id, + doc_rev=doc_rev, + ciphertext=ciphertext, + enc_scheme=enc_scheme, + enc_method=enc_method, + enc_iv=enc_iv) + return hmac.new( + doc_mac_key(doc_id, secret), + content, + hashlib.sha256).digest() def encrypt_doc(crypto, doc): @@ -319,12 +298,12 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): string representing the following: { - ENC_JSON_KEY: '<encrypted doc JSON string>', - ENC_SCHEME_KEY: 'symkey', - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: '<the initial value used to encrypt>', + crypto.ENC_JSON_KEY: '<encrypted doc JSON string>', + crypto.ENC_SCHEME_KEY: 'symkey', + crypto.ENC_METHOD_KEY: crypto.EncryptionMethods.AES_256_CTR, + crypto.ENC_IV_KEY: '<the initial value used to encrypt>', MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' + crypto.MAC_METHOD_KEY: 'hmac' } :param docstr: A representation of the document to be encrypted. @@ -339,30 +318,40 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): :param key: The key used to encrypt ``data`` (must be 256 bits long). :type key: str - :param secret: The Soledad secret (used for MAC auth). + :param secret: The Soledad storage secret (used for MAC auth). :type secret: str :return: The JSON serialization of the dict representing the encrypted content. :rtype: str """ - # encrypt content using AES-256 CTR mode - iv, ciphertext = encrypt_sym( + enc_scheme = crypto.EncryptionSchemes.SYMKEY + enc_method = crypto.EncryptionMethods.AES_256_CTR + mac_method = crypto.MacMethods.HMAC + enc_iv, ciphertext = encrypt_sym( str(docstr), # encryption/decryption routines expect str - key, method=EncryptionMethods.AES_256_CTR) + key, method=enc_method) + mac = binascii.b2a_hex( # store the mac as hex. + mac_doc( + doc_id, + doc_rev, + ciphertext, + enc_scheme, + enc_method, + enc_iv, + mac_method, + secret)) # Return a representation for the encrypted content. In the following, we # convert binary data to hexadecimal representation so the JSON # serialization does not complain about what it tries to serialize. hex_ciphertext = binascii.b2a_hex(ciphertext) return json.dumps({ - ENC_JSON_KEY: hex_ciphertext, - ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: iv, - MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. - doc_id, doc_rev, ciphertext, - MacMethods.HMAC, secret)), - MAC_METHOD_KEY: MacMethods.HMAC, + crypto.ENC_JSON_KEY: hex_ciphertext, + crypto.ENC_SCHEME_KEY: enc_scheme, + crypto.ENC_METHOD_KEY: enc_method, + crypto.ENC_IV_KEY: enc_iv, + crypto.MAC_KEY: mac, + crypto.MAC_METHOD_KEY: mac_method, }) @@ -384,27 +373,77 @@ def decrypt_doc(crypto, doc): return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) +def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, + enc_iv, mac_method, secret, doc_mac): + """ + Verify that C{doc_mac} is a correct MAC for the given document. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param enc_scheme: The encryption scheme. + :type enc_scheme: str + :param enc_method: The encryption method. + :type enc_method: str + :param enc_iv: The encryption initialization vector. + :type enc_iv: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: The Soledad storage secret + :type secret: str + :param doc_mac: The MAC to be verified against. + :type doc_mac: str + + :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown. + :raise crypto.WrongMacError: Raised when MAC could not be verified. + """ + calculated_mac = mac_doc( + doc_id, + doc_rev, + ciphertext, + enc_scheme, + enc_method, + enc_iv, + mac_method, + secret) + # we compare mac's hashes to avoid possible timing attacks that might + # exploit python's builtin comparison operator behaviour, which fails + # immediatelly when non-matching bytes are found. + doc_mac_hash = hashlib.sha256( + binascii.a2b_hex( # the mac is stored as hex + doc_mac)).digest() + calculated_mac_hash = hashlib.sha256(calculated_mac).digest() + + if doc_mac_hash != calculated_mac_hash: + logger.warning("Wrong MAC while decrypting doc...") + raise crypto.WrongMacError("Could not authenticate document's " + "contents.") + + def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): """ - Decrypt C{doc}'s content. + Decrypt a symmetrically encrypted C{doc}'s content. Return the JSON string representation of the document's decrypted content. The passed doc_dict argument should have the following structure: { - ENC_JSON_KEY: '<enc_blob>', - ENC_SCHEME_KEY: '<enc_scheme>', - ENC_METHOD_KEY: '<enc_method>', - ENC_IV_KEY: '<initial value used to encrypt>', # (optional) + crypto.ENC_JSON_KEY: '<enc_blob>', + crypto.ENC_SCHEME_KEY: '<enc_scheme>', + crypto.ENC_METHOD_KEY: '<enc_method>', + crypto.ENC_IV_KEY: '<initial value used to encrypt>', # (optional) MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' + crypto.MAC_METHOD_KEY: 'hmac' } C{enc_blob} is the encryption of the JSON serialization of the document's content. For now Soledad just deals with documents whose C{enc_scheme} is - EncryptionSchemes.SYMKEY and C{enc_method} is - EncryptionMethods.AES_256_CTR. + crypto.EncryptionSchemes.SYMKEY and C{enc_method} is + crypto.EncryptionMethods.AES_256_CTR. :param doc_dict: The content of the document to be decrypted. :type doc_dict: dict @@ -423,48 +462,35 @@ def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): :return: The JSON serialization of the decrypted content. :rtype: str + + :raise UnknownEncryptionMethodError: Raised when trying to decrypt from an + unknown encryption method. """ - soledad_assert(ENC_JSON_KEY in doc_dict) - soledad_assert(ENC_SCHEME_KEY in doc_dict) - soledad_assert(ENC_METHOD_KEY in doc_dict) - soledad_assert(MAC_KEY in doc_dict) - soledad_assert(MAC_METHOD_KEY in doc_dict) - - # verify MAC - ciphertext = binascii.a2b_hex( # content is stored as hex. - doc_dict[ENC_JSON_KEY]) - mac = mac_doc( - doc_id, doc_rev, - ciphertext, - doc_dict[MAC_METHOD_KEY], secret) - # we compare mac's hashes to avoid possible timing attacks that might - # exploit python's builtin comparison operator behaviour, which fails - # immediatelly when non-matching bytes are found. - doc_mac_hash = hashlib.sha256( - binascii.a2b_hex( # the mac is stored as hex - doc_dict[MAC_KEY])).digest() - calculated_mac_hash = hashlib.sha256(mac).digest() + # assert document dictionary structure + expected_keys = set([ + crypto.ENC_JSON_KEY, + crypto.ENC_SCHEME_KEY, + crypto.ENC_METHOD_KEY, + crypto.ENC_IV_KEY, + crypto.MAC_KEY, + crypto.MAC_METHOD_KEY, + ]) + soledad_assert(expected_keys.issubset(set(doc_dict.keys()))) - if doc_mac_hash != calculated_mac_hash: - logger.warning("Wrong MAC while decrypting doc...") - raise WrongMac('Could not authenticate document\'s contents.') - # decrypt doc's content - enc_scheme = doc_dict[ENC_SCHEME_KEY] - plainjson = None - if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc_dict[ENC_METHOD_KEY] - if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc_dict) - plainjson = decrypt_sym( - ciphertext, key, - method=enc_method, - iv=doc_dict[ENC_IV_KEY]) - else: - raise UnknownEncryptionMethod(enc_method) - else: - raise UnknownEncryptionScheme(enc_scheme) - - return plainjson + ciphertext = binascii.a2b_hex(doc_dict[crypto.ENC_JSON_KEY]) + enc_scheme = doc_dict[crypto.ENC_SCHEME_KEY] + enc_method = doc_dict[crypto.ENC_METHOD_KEY] + enc_iv = doc_dict[crypto.ENC_IV_KEY] + doc_mac = doc_dict[crypto.MAC_KEY] + mac_method = doc_dict[crypto.MAC_METHOD_KEY] + + soledad_assert(enc_scheme == crypto.EncryptionSchemes.SYMKEY) + + _verify_doc_mac( + doc_id, doc_rev, ciphertext, enc_scheme, enc_method, + enc_iv, mac_method, secret, doc_mac) + + return decrypt_sym(ciphertext, key, method=enc_method, iv=enc_iv) def is_symmetrically_encrypted(doc): @@ -476,534 +502,8 @@ def is_symmetrically_encrypted(doc): :rtype: bool """ - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: + if doc.content and crypto.ENC_SCHEME_KEY in doc.content: + if doc.content[crypto.ENC_SCHEME_KEY] \ + == crypto.EncryptionSchemes.SYMKEY: return True return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - WORKERS = 5 - - def __init__(self, crypto, sync_db, write_lock): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: a database connection handle - :type sync_db: handle - - :param write_lock: a write lock for controlling concurrent access - to the sync_db - :type write_lock: threading.Lock - """ - self._pool = multiprocessing.Pool(self.WORKERS) - self._crypto = crypto - self._sync_db = sync_db - self._sync_db_write_lock = write_lock - - def close(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = 5 - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" - - def encrypt_doc(self, doc, workers=True): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline - res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) - - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param content: The encrypted document. - :type content: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id - - -def get_insertable_docs_by_gen(expected, got): - """ - Return a list of documents ready to be inserted. This list is computed - by aligning the expected list with the already gotten docs, and returning - the maximum number of docs that can be processed in the expected order - before finding a gap. - - :param expected: A list of generations to be inserted. - :type expected: list - - :param got: A dictionary whose values are the docs to be inserted. - :type got: dict - """ - ordered = [got.get(i) for i in expected] - if None in ordered: - return ordered[:ordered.index(None)] - else: - return ordered - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. All the encrypted docs are collected, together with their generation - and transaction-id - 2. The docs are enqueued for decryption. When completed, they are - inserted following the generation order. - """ - # TODO implement throttling to reduce cpu usage?? - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id" - - write_encrypted_lock = threading.Lock() - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.decrypted_docs = {} - self.source_replica_uid = None - - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid - - def insert_encrypted_received_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert a received message with encrypted content, to be decrypted later - on. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - docstr = json.dumps(content) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - with con: - con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) - - def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): - """ - Insert a marker with the document id, revision and generation on the - sync db. This document does not have an encrypted payload, so the - content has already been inserted into the decrypted_docs dictionary - from where it can be picked following generation order. - We need to leave here the marker to be able to calculate the expected - insertion order for a synchronization batch. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param gen: the Document Generation - :type gen: int - """ - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - with con: - con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) - - def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - # XXX this need a deeper review / testing. - # I believe that what I'm doing here is prone to problems - # if the sync is interrupted (ie, client crash) in the worst possible - # moment. We would need a recover strategy in that case - # (or, insert the document in the table all the same, but with a flag - # saying if the document is sym-encrypted or not), - content = json.dumps(content) - result = doc_id, doc_rev, content, gen, trans_id - self.decrypted_docs[gen] = result - self.insert_marker_for_received_doc(doc_id, doc_rev, gen) - - def delete_encrypted_received_doc(self, doc_id, doc_rev): - """ - Delete a encrypted received doc after it was inserted into the local - db. - - :param doc_id: Document ID. - :type doc_id: str - :param doc_rev: Document revision. - :type doc_rev: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id, doc_rev)) - - def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): - """ - Symmetrically decrypt a document. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param source_replica_uid: - :type source_replica_uid: str - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - self.source_replica_uid = source_replica_uid - - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - - # XXX move to get_doc function... - c = self._sync_db.cursor() - sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - try: - c.execute(sql, (doc_id, rev)) - res = c.fetchone() - except Exception as exc: - logger.warning("Error getting docs from syncdb: %r" % (exc,)) - return - if res is None: - logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) - return - - soledad_assert(self._crypto is not None, "need a crypto object") - try: - doc_id, rev, docstr, gen, trans_id = res - except ValueError: - logger.warning("Wrong entry in sync db") - return - - if len(docstr) == 0: - # not encrypted payload - return - - try: - content = json.loads(docstr) - except TypeError: - logger.warning("Wrong type while decoding json: %s" % repr(docstr)) - return - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret - - try: - if workers: - # Ouch. This is sent to the workers asynchronously, so - # we have no way of logging errors. We'd have to inspect - # lingering results by querying successful / get() over them... - # Or move the heck out of it to twisted. - res = self._pool.apply_async( - decrypt_doc_task, args, - callback=self.decrypt_doc_cb) - else: - # decrypt inline - res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def decrypt_doc_cb(self, result): - """ - Temporarily store the decryption result in a dictionary where it will - be picked by process_decrypted. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) - self.decrypted_docs[gen] = result - - def get_docs_by_generation(self): - """ - Get all documents in the received table from the sync db, - ordered by generation. - - :return: list of doc_id, rev, generation - """ - c = self._sync_db.cursor() - sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( - self.TABLE_NAME,) - c.execute(sql) - return c.fetchall() - - def count_received_encrypted_docs(self): - """ - Count how many documents we have in the table for received and - encrypted docs. - - :return: The count of documents. - :rtype: int - """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - c = self._sync_db.cursor() - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) - c.execute(sql) - res = c.fetchone() - if res is not None: - return res[0] - else: - return 0 - - def decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - """ - docs_by_generation = self.get_docs_by_generation() - logger.debug("Sync decrypter pool: There are %d documents to " \ - "decrypt." % len(docs_by_generation)) - for doc_id, rev, gen in filter(None, docs_by_generation): - self.decrypt_doc(doc_id, rev, self.source_replica_uid) - - def process_decrypted(self): - """ - Process the already decrypted documents, and insert as many documents - as can be taken from the expected order without finding a gap. - - :return: Whether we have processed all the pending docs. - :rtype: bool - """ - # Acquire the lock to avoid processing while we're still - # getting data from the syncing stream, to avoid InvalidGeneration - # problems. - with self.write_encrypted_lock: - already_decrypted = self.decrypted_docs - docs = self.get_docs_by_generation() - docs = filter(lambda entry: len(entry) > 0, docs) - expected = [gen for doc_id, rev, gen in docs] - docs_to_insert = get_insertable_docs_by_gen( - expected, already_decrypted) - for doc_fields in docs_to_insert: - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_received_encrypted_docs() - return remaining == 0 - - def insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] - logger.debug("Sync decrypter pool: inserting doc in local db: " \ - "%s:%s %s" % (doc_id, doc_rev, gen)) - try: - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - insert_fun(doc, int(gen), trans_id) - except Exception as exc: - logger.error("Sync decrypter pool: error while inserting " - "decrypted doc into local db.") - logger.exception(exc) - - else: - # If no errors found, remove it from the local temporary dict - # and from the received database. - self.decrypted_docs.pop(gen) - self.delete_encrypted_received_doc(doc_id, doc_rev) |