diff options
author | drebs <drebs@leap.se> | 2014-07-02 12:21:31 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-07-02 12:40:59 -0300 |
commit | c3870a4315acf30893679e4cd11c990a4338e47b (patch) | |
tree | 651d5f0391b88822beddbf793fd0a91c35bff78d /client/src | |
parent | 2628bbf4c5faff50491cdd227c787ca7f148f368 (diff) |
Decouple sync from actual encryption/decryption (#5326).
Diffstat (limited to 'client/src')
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 71 | ||||
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 1006 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 246 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 156 |
4 files changed, 1309 insertions, 170 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 0d3a21fd..e06335f0 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -223,33 +223,48 @@ class Soledad(object): """ def __init__(self, uuid, passphrase, secrets_path, local_db_path, - server_url, cert_file, auth_token=None, secret_id=None): + server_url, cert_file, + auth_token=None, secret_id=None, defer_encryption=False): """ Initialize configuration, cryptographic keys and dbs. :param uuid: User's uuid. :type uuid: str + :param passphrase: The passphrase for locking and unlocking encryption secrets for local and remote storage. :type passphrase: unicode + :param secrets_path: Path for storing encrypted key used for symmetric encryption. :type secrets_path: str + :param local_db_path: Path for local encrypted storage db. :type local_db_path: str + :param server_url: URL for Soledad server. This is used either to sync - with the user's remote db and to interact with the shared recovery - database. + with the user's remote db and to interact with the + shared recovery database. :type server_url: str + :param cert_file: Path to the certificate of the ca used to validate the SSL certificate used by the remote soledad server. :type cert_file: str + :param auth_token: Authorization token for accessing remote databases. :type auth_token: str + :param secret_id: The id of the storage secret to be used. + :type secret_id: str + + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :raise BootstrapSequenceError: Raised when the secret generation and - storage on server sequence has failed for some reason. + storage on server sequence has failed + for some reason. """ # get config params self._uuid = uuid @@ -258,8 +273,10 @@ class Soledad(object): # init crypto variables self._secrets = {} self._secret_id = secret_id - # init config (possibly with default values) + self._defer_encryption = defer_encryption + self._init_config(secrets_path, local_db_path, server_url) + self._set_token(auth_token) self._shared_db_instance = None # configure SSL certificate @@ -469,24 +486,19 @@ class Soledad(object): create=True, document_factory=SoledadDocument, crypto=self._crypto, - raw_key=True) + raw_key=True, + defer_encryption=self._defer_encryption) def close(self): """ Close underlying U1DB database. """ + logger.debug("Closing soledad") if hasattr(self, '_db') and isinstance( self._db, SQLCipherDatabase): self._db.close() - def __del__(self): - """ - Make sure local database is closed when object is destroyed. - """ - # Watch out! We have no guarantees that this is properly called. - self.close() - # # Management of secret for symmetric encryption. # @@ -520,6 +532,9 @@ class Soledad(object): Define the id of the storage secret to be used. This method will also replace the secret in the crypto object. + + :param secret_id: The id of the storage secret to be used. + :type secret_id: str """ self._secret_id = secret_id @@ -881,7 +896,7 @@ class Soledad(object): :type json: str :param doc_id: An optional identifier specifying the document id. :type doc_id: - :return: The new cocument + :return: The new document :rtype: SoledadDocument """ return self._db.create_doc_from_json(json, doc_id=doc_id) @@ -1041,7 +1056,7 @@ class Soledad(object): if self._db: return self._db.resolve_doc(doc, conflicted_doc_revs) - def sync(self): + def sync(self, defer_decryption=True): """ Synchronize the local encrypted replica with a remote replica. @@ -1051,16 +1066,25 @@ class Soledad(object): :param url: the url of the target replica to sync with :type url: str - :return: the local generation before the synchronisation was - performed. + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: The local generation before the synchronisation was + performed. :rtype: str """ if self._db: + try: local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=False) + creds=self._creds, autocreate=False, + defer_decryption=defer_decryption) signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + except Exception as e: + logger.error("Soledad exception when syncing: %s" % str(e)) def stop_sync(self): """ @@ -1079,7 +1103,9 @@ class Soledad(object): :return: Whether remote replica and local replica differ. :rtype: bool """ - target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto) + target = SoledadSyncTarget( + url, self._db._get_replica_uid(), creds=self._creds, + crypto=self._crypto) info = target.get_sync_info(self._db._get_replica_uid()) # compare source generation with target's last known source generation if self._db._get_generation() != info[4]: @@ -1087,6 +1113,13 @@ class Soledad(object): return True return False + @property + def syncing(self): + """ + Property, True if the syncer is syncing. + """ + return self._db.syncing + def _set_token(self, token): """ Set the authentication token for remote database access. diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index a6372107..7133f804 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # crypto.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,27 +14,45 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Cryptographic utilities for Soledad. """ - - import os import binascii import hmac import hashlib - +import json +import logging +import multiprocessing +import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument + + +from leap.soledad.common.crypto import ( + EncryptionSchemes, + UnknownEncryptionScheme, + MacMethods, + UnknownMacMethod, + WrongMac, + ENC_JSON_KEY, + ENC_SCHEME_KEY, + ENC_METHOD_KEY, + ENC_IV_KEY, + MAC_KEY, + MAC_METHOD_KEY, +) +logger = logging.getLogger(__name__) -from leap.soledad.common import ( - soledad_assert, - soledad_assert_type, -) + +MAC_KEY_LENGTH = 64 class EncryptionMethods(object): @@ -45,6 +63,17 @@ class EncryptionMethods(object): AES_256_CTR = 'aes-256-ctr' XSALSA20 = 'xsalsa20' +# +# Exceptions +# + + +class DocumentNotEncrypted(Exception): + """ + Raised for failures in document encryption. + """ + pass + class UnknownEncryptionMethod(Exception): """ @@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception): """ -class SoledadCrypto(object): +def encrypt_sym(data, key, method): """ - General cryptographic functionality. + Encrypt C{data} using a {password}. + + Currently, the only encryption methods supported are AES-256 in CTR + mode and XSalsa20. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + + :return: A tuple with the initial value and the encrypted data. + :rtype: (long, str) + """ + soledad_assert_type(key, str) + + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s bits (must be 256 bits long).' % + (len(key) * 8)) + iv = None + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + iv = os.urandom(16) + ciphertext = AES(key=key, iv=iv).process(data) + # XSalsa20 + elif method == EncryptionMethods.XSALSA20: + iv = os.urandom(24) + ciphertext = XSalsa20(key=key, iv=iv).process(data) + else: + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, method, **kwargs): """ + Decrypt data using symmetric secret. + + Currently, the only encryption method supported is AES-256 CTR mode. - MAC_KEY_LENGTH = 64 + :param data: The data to be decrypted. + :type data: str + :param key: The key used to decrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + :param kwargs: Other parameters specific to each encryption method. + :type kwargs: dict + + :return: The decrypted data. + :rtype: str + """ + soledad_assert_type(key, str) + # assert params + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s (must be 256 bits long).' % len(key)) + soledad_assert( + 'iv' in kwargs, + '%s needs an initial value.' % method) + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + return AES( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + elif method == EncryptionMethods.XSALSA20: + return XSalsa20( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + + +def doc_mac_key(doc_id, secret): + """ + Generate a key for calculating a MAC for a document whose id is + C{doc_id}. + The key is derived using HMAC having sha256 as underlying hash + function. The key used for HMAC is the first MAC_KEY_LENGTH characters + of Soledad's storage secret. The HMAC message is C{doc_id}. + + :param doc_id: The id of the document. + :type doc_id: str + + :param secret: soledad secret storage + :type secret: Soledad.storage_secret + + :return: The key. + :rtype: str + + :raise NoSymmetricSecret: if no symmetric secret was supplied. + """ + if secret is None: + raise NoSymmetricSecret() + + return hmac.new( + secret[:MAC_KEY_LENGTH], + doc_id, + hashlib.sha256).digest() + + +class SoledadCrypto(object): + """ + General cryptographic functionality encapsulated in a + object that can be passed along. + """ def __init__(self, soledad): """ Initialize the crypto object. @@ -77,78 +209,14 @@ class SoledadCrypto(object): def encrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR): - """ - Encrypt C{data} using a {password}. - - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be encrypted. - :type data: str - :param key: The key used to encrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - - :return: A tuple with the initial value and the encrypted data. - :rtype: (long, str) - """ - soledad_assert_type(key, str) - - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s bits (must be 256 bits long).' % - (len(key) * 8)) - iv = None - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - iv = os.urandom(16) - ciphertext = AES(key=key, iv=iv).process(data) - # XSalsa20 - elif method == EncryptionMethods.XSALSA20: - iv = os.urandom(24) - ciphertext = XSalsa20(key=key, iv=iv).process(data) - else: - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) - return binascii.b2a_base64(iv), ciphertext + return encrypt_sym(data, key, method) def decrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR, **kwargs): - """ - Decrypt data using symmetric secret. + return decrypt_sym(data, key, method, **kwargs) - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be decrypted. - :type data: str - :param key: The key used to decrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - :param kwargs: Other parameters specific to each encryption method. - :type kwargs: dict - - :return: The decrypted data. - :rtype: str - """ - soledad_assert_type(key, str) - # assert params - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s (must be 256 bits long).' % len(key)) - soledad_assert( - 'iv' in kwargs, - '%s needs an initial value.' % method) - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - return AES( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - elif method == EncryptionMethods.XSALSA20: - return XSalsa20( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + def doc_mac_key(self, doc_id, secret): + return doc_mac_key(doc_id, self.secret) def doc_passphrase(self, doc_id): """ @@ -173,41 +241,769 @@ class SoledadCrypto(object): raise NoSymmetricSecret() return hmac.new( self.secret[ - self.MAC_KEY_LENGTH: + MAC_KEY_LENGTH: self._soledad.REMOTE_STORAGE_SECRET_LENGTH], doc_id, hashlib.sha256).digest() - def doc_mac_key(self, doc_id): + # + # secret setters/getters + # + + def _get_secret(self): + return self._soledad.storage_secret + + secret = property( + _get_secret, doc='The secret used for symmetric encryption') + +# +# Crypto utilities for a SoledadDocument. +# + + +def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): + """ + Calculate a MAC for C{doc} using C{ciphertext}. + + Current MAC method used is HMAC, with the following parameters: + + * key: sha256(storage_secret, doc_id) + * msg: doc_id + doc_rev + ciphertext + * digestmod: sha256 + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: soledad secret + :type secret: Soledad.secret_storage + + :return: The calculated MAC. + :rtype: str + """ + if mac_method == MacMethods.HMAC: + return hmac.new( + doc_mac_key(doc_id, secret), + str(doc_id) + str(doc_rev) + ciphertext, + hashlib.sha256).digest() + # raise if we do not know how to handle this MAC method + raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + +def encrypt_doc(crypto, doc): + """ + Wrapper around encrypt_docstr that accepts a crypto object and the document + as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + + return encrypt_docstr( + doc.get_json(), doc.doc_id, doc.rev, key, secret) + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): + """ + Encrypt C{doc}'s content. + + Encrypt doc's contents using AES-256 CTR mode and return a valid JSON + string representing the following: + + { + ENC_JSON_KEY: '<encrypted doc JSON string>', + ENC_SCHEME_KEY: 'symkey', + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: '<the initial value used to encrypt>', + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + :param docstr: A representation of the document to be encrypted. + :type docstr: str or unicode. + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: The JSON serialization of the dict representing the encrypted + content. + :rtype: str + """ + # encrypt content using AES-256 CTR mode + iv, ciphertext = encrypt_sym( + str(docstr), # encryption/decryption routines expect str + key, method=EncryptionMethods.AES_256_CTR) + # Return a representation for the encrypted content. In the following, we + # convert binary data to hexadecimal representation so the JSON + # serialization does not complain about what it tries to serialize. + hex_ciphertext = binascii.b2a_hex(ciphertext) + return json.dumps({ + ENC_JSON_KEY: hex_ciphertext, + ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: iv, + MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. + doc_id, doc_rev, ciphertext, + MacMethods.HMAC, secret)), + MAC_METHOD_KEY: MacMethods.HMAC, + }) + + +def decrypt_doc(crypto, doc): + """ + Wrapper around decrypt_doc_dict that accepts a crypto object and the + document as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + + :return: json string with the decrypted document + :rtype: str + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): + """ + Decrypt C{doc}'s content. + + Return the JSON string representation of the document's decrypted content. + + The passed doc_dict argument should have the following structure: + + { + ENC_JSON_KEY: '<enc_blob>', + ENC_SCHEME_KEY: '<enc_scheme>', + ENC_METHOD_KEY: '<enc_method>', + ENC_IV_KEY: '<initial value used to encrypt>', # (optional) + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + C{enc_blob} is the encryption of the JSON serialization of the document's + content. For now Soledad just deals with documents whose C{enc_scheme} is + EncryptionSchemes.SYMKEY and C{enc_method} is + EncryptionMethods.AES_256_CTR. + + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: + :type secret: + + :return: The JSON serialization of the decrypted content. + :rtype: str + """ + soledad_assert(ENC_JSON_KEY in doc_dict) + soledad_assert(ENC_SCHEME_KEY in doc_dict) + soledad_assert(ENC_METHOD_KEY in doc_dict) + soledad_assert(MAC_KEY in doc_dict) + soledad_assert(MAC_METHOD_KEY in doc_dict) + + # verify MAC + ciphertext = binascii.a2b_hex( # content is stored as hex. + doc_dict[ENC_JSON_KEY]) + mac = mac_doc( + doc_id, doc_rev, + ciphertext, + doc_dict[MAC_METHOD_KEY], secret) + # we compare mac's hashes to avoid possible timing attacks that might + # exploit python's builtin comparison operator behaviour, which fails + # immediatelly when non-matching bytes are found. + doc_mac_hash = hashlib.sha256( + binascii.a2b_hex( # the mac is stored as hex + doc_dict[MAC_KEY])).digest() + calculated_mac_hash = hashlib.sha256(mac).digest() + + if doc_mac_hash != calculated_mac_hash: + logger.warning("Wrong MAC while decrypting doc...") + raise WrongMac('Could not authenticate document\'s contents.') + # decrypt doc's content + enc_scheme = doc_dict[ENC_SCHEME_KEY] + plainjson = None + if enc_scheme == EncryptionSchemes.SYMKEY: + enc_method = doc_dict[ENC_METHOD_KEY] + if enc_method == EncryptionMethods.AES_256_CTR: + soledad_assert(ENC_IV_KEY in doc_dict) + plainjson = decrypt_sym( + ciphertext, key, + method=enc_method, + iv=doc_dict[ENC_IV_KEY]) + else: + raise UnknownEncryptionMethod(enc_method) + else: + raise UnknownEncryptionScheme(enc_scheme) + + return plainjson + + +def is_symmetrically_encrypted(doc): + """ + Return True if the document was symmetrically encrypted. + + :param doc: The document to check. + :type doc: SoledadDocument + + :rtype: bool + """ + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: + return True + return False + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + WORKERS = 5 + + def __init__(self, crypto, sync_db, write_lock): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: a database connection handle + :type sync_db: handle + + :param write_lock: a write lock for controlling concurrent access + to the sync_db + :type write_lock: threading.Lock + """ + self._pool = multiprocessing.Pool(self.WORKERS) + self._crypto = crypto + self._sync_db = sync_db + self._sync_db_write_lock = write_lock + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + # TODO implement throttling to reduce cpu usage?? + WORKERS = 5 + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id, rev, content" + + def encrypt_doc(self, doc, workers=True): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + + try: + if workers: + res = self._pool.apply_async( + encrypt_doc_task, args, + callback=self.encrypt_doc_cb) + else: + # encrypt inline + res = encrypt_doc_task(*args) + self.encrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def encrypt_doc_cb(self, result): """ - Generate a key for calculating a MAC for a document whose id is - C{doc_id}. + Insert results of encryption routine into the local sync database. - The key is derived using HMAC having sha256 as underlying hash - function. The key used for HMAC is the first MAC_KEY_LENGTH characters - of Soledad's storage secret. The HMAC message is C{doc_id}. + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + self.insert_encrypted_local_doc(doc_id, doc_rev, content) - :param doc_id: The id of the document. + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param content: The encrypted document. + :type content: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - :return: The key. - :rtype: str + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, )) + con.execute(sql_ins, (doc_id, doc_rev, content)) - :raise NoSymmetricSecret: if no symmetric secret was supplied. + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): + """ + Return a list of documents ready to be inserted. This list is computed + by aligning the expected list with the already gotten docs, and returning + the maximum number of docs that can be processed in the expected order + before finding a gap. + + :param expected: A list of generations to be inserted. + :type expected: list + + :param got: A dictionary whose values are the docs to be inserted. + :type got: dict + """ + ordered = [got.get(i) for i in expected] + if None in ordered: + return ordered[:ordered.index(None)] + else: + return ordered + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. All the encrypted docs are collected, together with their generation + and transaction-id + 2. The docs are enqueued for decryption. When completed, they are + inserted following the generation order. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id, rev, content, gen, trans_id" + + write_encrypted_lock = threading.Lock() + + def __init__(self, *args, **kwargs): """ - if self.secret is None: - raise NoSymmetricSecret() - return hmac.new( - self.secret[:self.MAC_KEY_LENGTH], - doc_id, - hashlib.sha256).digest() + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + self.decrypted_docs = {} + self.source_replica_uid = None - # - # secret setters/getters - # + def set_source_replica_uid(self, source_replica_uid): + """ + Set the source replica uid for this decrypter pool instance. - def _get_secret(self): - return self._soledad.storage_secret + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + """ + self.source_replica_uid = source_replica_uid - secret = property( - _get_secret, doc='The secret used for symmetric encryption') + def insert_encrypted_received_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + docstr = json.dumps(content) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + + def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): + """ + Insert a marker with the document id, revision and generation on the + sync db. This document does not have an encrypted payload, so the + content has already been inserted into the decrypted_docs dictionary + from where it can be picked following generation order. + We need to leave here the marker to be able to calculate the expected + insertion order for a synchronization batch. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param gen: the Document Generation + :type gen: int + """ + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) + + def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + # XXX this need a deeper review / testing. + # I believe that what I'm doing here is prone to problems + # if the sync is interrupted (ie, client crash) in the worst possible + # moment. We would need a recover strategy in that case + # (or, insert the document in the table all the same, but with a flag + # saying if the document is sym-encrypted or not), + content = json.dumps(content) + result = doc_id, doc_rev, content, gen, trans_id + self.decrypted_docs[gen] = result + self.insert_marker_for_received_doc(doc_id, doc_rev, gen) + + def delete_encrypted_received_doc(self, doc_id, doc_rev): + """ + Delete a encrypted received doc after it was inserted into the local + db. + + :param doc_id: Document ID. + :type doc_id: str + :param doc_rev: Document revision. + :type doc_rev: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, doc_rev)) + + def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): + """ + Symmetrically decrypt a document. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param source_replica_uid: + :type source_replica_uid: str + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + self.source_replica_uid = source_replica_uid + + # insert_doc_cb is a proxy object that gets updated with the right + # insert function only when the sync_target invokes the sync_exchange + # method. so, if we don't still have a non-empty callback, we refuse + # to proceed. + if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), + None): + logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") + return + + # XXX move to get_doc function... + c = self._sync_db.cursor() + sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + try: + c.execute(sql, (doc_id, rev)) + res = c.fetchone() + except Exception as exc: + logger.warning("Error getting docs from syncdb: %r" % (exc,)) + return + if res is None: + logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) + return + + soledad_assert(self._crypto is not None, "need a crypto object") + try: + doc_id, rev, docstr, gen, trans_id = res + except ValueError: + logger.warning("Wrong entry in sync db") + return + + if len(docstr) == 0: + # not encrypted payload + return + + try: + content = json.loads(docstr) + except TypeError: + logger.warning("Wrong type while decoding json: %s" % repr(docstr)) + return + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret + + try: + if workers: + # Ouch. This is sent to the workers asynchronously, so + # we have no way of logging errors. We'd have to inspect + # lingering results by querying successful / get() over them... + # Or move the heck out of it to twisted. + res = self._pool.apply_async( + decrypt_doc_task, args, + callback=self.decrypt_doc_cb) + else: + # decrypt inline + res = decrypt_doc_task(*args) + self.decrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def decrypt_doc_cb(self, result): + """ + Temporarily store the decryption result in a dictionary where it will + be picked by process_decrypted. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, rev, content, gen, trans_id = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) + self.decrypted_docs[gen] = result + + def get_docs_by_generation(self): + """ + Get all documents in the received table from the sync db, + ordered by generation. + + :return: list of doc_id, rev, generation + """ + c = self._sync_db.cursor() + sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( + self.TABLE_NAME,) + c.execute(sql) + return c.fetchall() + + def count_received_encrypted_docs(self): + """ + Count how many documents we have in the table for received and + encrypted docs. + + :return: The count of documents. + :rtype: int + """ + if self._sync_db is None: + logger.warning("cannot return count with null sync_db") + return + c = self._sync_db.cursor() + sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + c.execute(sql) + res = c.fetchone() + if res is not None: + return res[0] + else: + return 0 + + def decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + """ + docs_by_generation = self.get_docs_by_generation() + logger.debug("Sync decrypter pool: There are %d documents to " \ + "decrypt." % len(docs_by_generation)) + for doc_id, rev, gen in filter(None, docs_by_generation): + self.decrypt_doc(doc_id, rev, self.source_replica_uid) + + def process_decrypted(self): + """ + Process the already decrypted documents, and insert as many documents + as can be taken from the expected order without finding a gap. + + :return: Whether we have processed all the pending docs. + :rtype: bool + """ + # Acquire the lock to avoid processing while we're still + # getting data from the syncing stream, to avoid InvalidGeneration + # problems. + with self.write_encrypted_lock: + already_decrypted = self.decrypted_docs + docs = self.get_docs_by_generation() + docs = filter(lambda entry: len(entry) > 0, docs) + expected = [gen for doc_id, rev, gen in docs] + docs_to_insert = get_insertable_docs_by_gen( + expected, already_decrypted) + for doc_fields in docs_to_insert: + self.insert_decrypted_local_doc(*doc_fields) + remaining = self.count_received_encrypted_docs() + return remaining == 0 + + def insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + logger.debug("Sync decrypter pool: inserting doc in local db: " \ + "%s:%s %s" % (doc_id, doc_rev, gen)) + try: + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + insert_fun(doc, int(gen), trans_id) + except Exception as exc: + logger.error("Sync decrypter pool: error while inserting " + "decrypted doc into local db.") + logger.exception(exc) + + else: + # If no errors found, remove it from the local temporary dict + # and from the received database. + self.decrypted_docs.pop(gen) + self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..46ceca42 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC when opening a database. So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging +import multiprocessing import os +import sqlite3 import string import threading import time @@ -57,9 +57,12 @@ from collections import defaultdict from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None def open(path, password, create=True, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """Open a database at the given location. Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None, :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: An instance of Database. :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None, return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, - cipher_page_size=cipher_page_size) + cipher_page_size=cipher_page_size, defer_encryption=defer_encryption) # @@ -147,19 +153,40 @@ class NotAnHexString(Exception): # class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): - """A U1DB implementation that uses SQLCipher as its persistence layer.""" + """ + A U1DB implementation that uses SQLCipher as its persistence layer. + """ + defer_encryption = False _index_storage_value = 'expand referenced encrypted' k_lock = threading.Lock() create_doc_lock = threading.Lock() update_indexes_lock = threading.Lock() + _sync_watcher = None + _sync_enc_pool = None + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - syncing_lock = defaultdict(threading.Lock) """ A dictionary that hold locks which avoid multiple sync attempts from the same database replica. """ + encrypting_lock = threading.Lock() + + """ + Period or recurrence of the periodic encrypting task, in seconds. + """ + ENCRYPT_TASK_PERIOD = 1 + syncing_lock = defaultdict(threading.Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.assert_db_is_encrypted( sqlcipher_file, password, raw_key, cipher, kdf_iter, cipher_page_size) - # connect to the database + + # connect to the sqlcipher database with self.k_lock: self._db_handle = dbapi2.connect( sqlcipher_file, @@ -215,6 +243,26 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._ensure_schema() self._crypto = crypto + self._sync_db = None + self._sync_db_write_lock = None + self._sync_enc_pool = None + + if self.defer_encryption: + if sqlcipher_file != ":memory:": + self._sync_db_path = "%s-sync" % sqlcipher_file + else: + self._sync_db_path = ":memory:" + + # initialize sync db + self._init_sync_db() + + # initialize syncing queue encryption pool + self._sync_enc_pool = SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + self._sync_watcher = TimerTask(self._encrypt_syncing_docs, + self.ENCRYPT_TASK_PERIOD) + self._sync_watcher.start() + def factory(doc_id=None, rev=None, json='{}', has_conflicts=False, syncable=True): return SoledadDocument(doc_id=doc_id, rev=rev, json=json, @@ -226,7 +274,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', - kdf_iter=4000, cipher_page_size=1024): + kdf_iter=4000, cipher_page_size=1024, + defer_encryption=False): """ Open a SQLCipher database. @@ -249,10 +298,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption if not os.path.isfile(sqlcipher_file): raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +351,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): def open_database(cls, sqlcipher_file, password, create, backend_cls=None, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """ Open a SQLCipher database. :param sqlcipher_file: The path for the SQLCipher file. :type sqlcipher_file: str + :param password: The password that protects the SQLCipher db. :type password: str + :param create: Should the datbase be created if it does not already - exist? - :type: bool + exist? + :type create: bool + :param backend_cls: A class to use as backend. :type backend_cls: type + :param document_factory: A function that will be called with the same - parameters as Document.__init__. + parameters as Document.__init__. :type document_factory: callable + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. + document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param raw_key: Whether C{password} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. + passphrase that should be hashed to obtain the + encyrption key. :type raw_key: bool + :param cipher: The cipher and mode to use. :type cipher: str + :param kdf_iter: The number of iterations to use. :type kdf_iter: int + :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption try: return cls._open_database( sqlcipher_file, password, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, - kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) + kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, + defer_encryption=defer_encryption) except u1db_errors.DatabaseDoesNotExist: if not create: raise @@ -347,7 +416,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) - def sync(self, url, creds=None, autocreate=True): + def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -362,6 +431,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :param autocreate: Ask the target to create the db if non-existent. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool :return: The local generation before the synchronisation was performed. :rtype: int @@ -370,7 +443,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # the following context manager blocks until the syncing lock can be # acquired. with self.syncer(url, creds=creds) as syncer: - res = syncer.sync(autocreate=autocreate) + + # XXX could mark the critical section here... + try: + res = syncer.sync(autocreate=autocreate, + defer_decryption=defer_decryption) + + except PendingReceivedDocsSyncError: + logger.warning("Local sync db is not clear, skipping sync...") + return + return res def stop_sync(self): @@ -394,7 +476,18 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: syncer = self._get_syncer(url, creds=creds) yield syncer - syncer.sync_target.close() + #syncer.sync_target.close() + + @property + def syncing(self): + syncing = False + for url in self._syncers: + _, _, lock = self._syncers[url] + is_not_locked = lock.acquire(blocking=False) + if is_not_locked is False: + return True + lock.release() + return False def _get_syncer(self, url, creds=None): """ @@ -415,11 +508,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - syncer = Synchronizer( + wlock = self._sync_db_write_lock + syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, + self._replica_uid, creds=creds, - crypto=self._crypto)) + crypto=self._crypto, + sync_db=self._sync_db, + sync_db_write_lock=wlock)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -442,21 +539,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') - def create_doc(self, content, doc_id=None): + def _init_sync_db(self): + """ + Initialize the Symmetrically-Encrypted document to be synced database, + and the queue to communicate with subprocess workers. """ - Create a new document in the local encrypted database. + self._sync_db = sqlite3.connect(self._sync_db_path, + check_same_thread=False) - :param content: the contents of the new document - :type content: dict - :param doc_id: an optional identifier specifying the document id - :type doc_id: str + self._sync_db_write_lock = threading.Lock() + self._create_sync_db_tables() + self.sync_queue = multiprocessing.Queue() + + def _create_sync_db_tables(self): + """ + Create tables for the local sync documents db if needed. + """ + encr = SyncEncrypterPool + decr = SyncDecrypterPool + sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + encr.TABLE_NAME, encr.FIELD_NAMES)) + sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + decr.TABLE_NAME, decr.FIELD_NAMES)) + + with self._sync_db_write_lock: + with self._sync_db: + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) + + # + # Symmetric encryption of syncing docs + # + + def _encrypt_syncing_docs(self): + """ + Process the syncing queue and send the documents there + to be encrypted in the sync db. They will be read by the + SoledadSyncTarget during the sync_exchange. + + Called periodical from the TimerTask self._sync_watcher. + """ + lock = self.encrypting_lock + # optional wait flag used to avoid blocking + if not lock.acquire(False): + return + else: + queue = self.sync_queue + try: + while not queue.empty(): + doc = queue.get_nowait() + self._sync_enc_pool.encrypt_doc(doc) + + except Exception as exc: + logger.error("Error while encrypting docs to sync") + logger.exception(exc) + finally: + lock.release() + + # + # Document operations + # - :return: the new document - :rtype: SoledadDocument + def put_doc(self, doc): """ - with self.create_doc_lock: - return sqlite_backend.SQLitePartialExpandDatabase.create_doc( - self, content, doc_id=doc_id) + Overwrite the put_doc method, to enqueue the modified document for + encryption before sync. + + :param doc: The document to be put. + :type doc: u1db.Document + + :return: The new document revision. + :rtype: str + """ + doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( + self, doc) + if self.defer_encryption: + self.sync_queue.put_nowait(doc) + return doc_rev + + # indexes def _put_and_update_indexes(self, old_doc, doc): """ @@ -906,12 +1067,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = c.fetchall() return res[0][0] - def __del__(self): + def close(self): """ - Closes db_handle upon object destruction. + Close db_handle and close syncer. """ + logger.debug("Sqlcipher backend: closing") + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.close() + if self._sync_enc_pool is not None: + self._sync_enc_pool.close() if self._db_handle is not None: self._db_handle.close() + @property + def replica_uid(self): + return self._get_replica_uid() + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..640e22ff 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,86 @@ """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + + * Defer the update of the known replica uid until all the decryption of + the incoming messages has been processed. + + * Be interrupted and recovered. """ + import json +import logging +from threading import Lock from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. - Modified to allow for interrupting the synchronization process. + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. """ + syncing_lock = Lock() + def stop(self): """ Stop the current sync in progress. """ self.sync_target.stop() - def sync(self, autocreate=False): + def sync(self, autocreate=False, defer_decryption=True): """ Synchronize documents between source and target. + Differently from u1db `Synchronizer.sync` method, this one allows to + pass a `defer_decryption` flag that will postpone the last + step in the synchronization dance, namely, the setting of the last + known generation and transaction id for a given remote replica. + + This is done to allow the ongoing parallel decryption of the incoming + docs to proceed without `InvalidGeneration` conflicts. + :param autocreate: Whether the target replica should be created or not. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + """ + self.syncing_lock.acquire() + try: + return self._sync(autocreate=autocreate, + defer_decryption=defer_decryption) + except Exception: + # We release the lock if there was an error. + # Otherwise, the lock should be released from the function + # `complete_sync`. + self.release_syncing_lock() + # re-raising the exceptions to let syqlcipher.sync catch them + # (and re-create the syncer instance if needed) + raise + + def _sync(self, autocreate=False, defer_decryption=True): + """ + Helper function, called from the main `sync` method. + See `sync` docstring. """ sync_target = self.sync_target @@ -64,6 +115,16 @@ class Synchronizer(U1DBSynchronizer): target_gen, target_trans_id = (0, '') target_my_gen, target_my_trans_id = (0, '') + logger.debug( + "Soledad target sync info:\n" + " target replica uid: %s\n" + " target generation: %d\n" + " target trans id: %s\n" + " target my gen: %d\n" + " target my trans_id: %s" + % (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id)) + # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -80,6 +141,8 @@ class Synchronizer(U1DBSynchronizer): # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("Soledad sync: there are %d documents to send." \ + % len(changes)) # get source last-seen database generation for the target if self.target_replica_uid is None: @@ -88,11 +151,17 @@ class Synchronizer(U1DBSynchronizer): target_last_known_gen, target_last_known_trans_id = \ self.source._get_replica_gen_and_trans_id( self.target_replica_uid) + logger.debug( + "Soledad source sync info:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: if target_trans_id != target_last_known_trans_id: raise errors.InvalidTransactionId + self.release_syncing_lock() return my_gen # prepare to send all the changed docs @@ -114,12 +183,79 @@ class Synchronizer(U1DBSynchronizer): new_gen, new_trans_id = sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback) + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (new_gen, new_trans_id)) + + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + + if defer_decryption and not sync_target.has_syncdb(): + logger.debug("Sync target has no valid sync db, " + "aborting defer_decryption") + defer_decryption = False - # record target synced-up-to generation including applying what we sent + self.complete_sync() + return my_gen + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + """ + logger.debug("Completing deferred last step in SYNC...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info self.source._set_replica_gen_and_trans_id( - self.target_replica_uid, new_gen, new_trans_id) + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + # if gapless record current reached generation with target - self._record_sync_info_with_the_target(my_gen) + self._record_sync_info_with_the_target(info["my_gen"]) + self.syncing_lock.release() - return my_gen + @property + def syncing(self): + """ + Return True if a sync is ongoing, False otherwise. + :rtype: bool + """ + # XXX FIXME we need some mechanism for timeout: should cleanup and + # release if something in the syncdb-decrypt goes wrong. we could keep + # track of the release date and cleanup unrealistic sync entries after + # some time. + locked = self.syncing_lock.locked() + return locked + + def release_syncing_lock(self): + """ + Release syncing lock if it's locked. + """ + if self.syncing_lock.locked(): + self.syncing_lock.release() + + def close(self): + """ + Close sync target pool of workers. + """ + self.release_syncing_lock() + self.sync_target.close() + + def __del__(self): + """ + Cleanup: release lock. + """ + self.release_syncing_lock() |