diff options
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 73 | ||||
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 1006 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 249 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 163 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 1622 |
5 files changed, 2507 insertions, 606 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 0d3a21fd..586e3389 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -223,33 +223,48 @@ class Soledad(object): """ def __init__(self, uuid, passphrase, secrets_path, local_db_path, - server_url, cert_file, auth_token=None, secret_id=None): + server_url, cert_file, + auth_token=None, secret_id=None, defer_encryption=False): """ Initialize configuration, cryptographic keys and dbs. :param uuid: User's uuid. :type uuid: str + :param passphrase: The passphrase for locking and unlocking encryption secrets for local and remote storage. :type passphrase: unicode + :param secrets_path: Path for storing encrypted key used for symmetric encryption. :type secrets_path: str + :param local_db_path: Path for local encrypted storage db. :type local_db_path: str + :param server_url: URL for Soledad server. This is used either to sync - with the user's remote db and to interact with the shared recovery - database. + with the user's remote db and to interact with the + shared recovery database. :type server_url: str + :param cert_file: Path to the certificate of the ca used to validate the SSL certificate used by the remote soledad server. :type cert_file: str + :param auth_token: Authorization token for accessing remote databases. :type auth_token: str + :param secret_id: The id of the storage secret to be used. + :type secret_id: str + + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :raise BootstrapSequenceError: Raised when the secret generation and - storage on server sequence has failed for some reason. + storage on server sequence has failed + for some reason. """ # get config params self._uuid = uuid @@ -258,8 +273,10 @@ class Soledad(object): # init crypto variables self._secrets = {} self._secret_id = secret_id - # init config (possibly with default values) + self._defer_encryption = defer_encryption + self._init_config(secrets_path, local_db_path, server_url) + self._set_token(auth_token) self._shared_db_instance = None # configure SSL certificate @@ -390,6 +407,7 @@ class Soledad(object): # release the lock on shared db try: self._shared_db.unlock(token) + self._shared_db.close() except NotLockedError: # for some reason the lock expired. Despite that, secret # loading or generation/storage must have been executed @@ -469,24 +487,20 @@ class Soledad(object): create=True, document_factory=SoledadDocument, crypto=self._crypto, - raw_key=True) + raw_key=True, + defer_encryption=self._defer_encryption) def close(self): """ Close underlying U1DB database. """ + logger.debug("Closing soledad") if hasattr(self, '_db') and isinstance( self._db, SQLCipherDatabase): + self._db.stop_sync() self._db.close() - def __del__(self): - """ - Make sure local database is closed when object is destroyed. - """ - # Watch out! We have no guarantees that this is properly called. - self.close() - # # Management of secret for symmetric encryption. # @@ -520,6 +534,9 @@ class Soledad(object): Define the id of the storage secret to be used. This method will also replace the secret in the crypto object. + + :param secret_id: The id of the storage secret to be used. + :type secret_id: str """ self._secret_id = secret_id @@ -881,7 +898,7 @@ class Soledad(object): :type json: str :param doc_id: An optional identifier specifying the document id. :type doc_id: - :return: The new cocument + :return: The new document :rtype: SoledadDocument """ return self._db.create_doc_from_json(json, doc_id=doc_id) @@ -1041,7 +1058,7 @@ class Soledad(object): if self._db: return self._db.resolve_doc(doc, conflicted_doc_revs) - def sync(self): + def sync(self, defer_decryption=True): """ Synchronize the local encrypted replica with a remote replica. @@ -1051,16 +1068,25 @@ class Soledad(object): :param url: the url of the target replica to sync with :type url: str - :return: the local generation before the synchronisation was - performed. + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: The local generation before the synchronisation was + performed. :rtype: str """ if self._db: + try: local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=False) + creds=self._creds, autocreate=False, + defer_decryption=defer_decryption) signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + except Exception as e: + logger.error("Soledad exception when syncing: %s" % str(e)) def stop_sync(self): """ @@ -1079,7 +1105,9 @@ class Soledad(object): :return: Whether remote replica and local replica differ. :rtype: bool """ - target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto) + target = SoledadSyncTarget( + url, self._db._get_replica_uid(), creds=self._creds, + crypto=self._crypto) info = target.get_sync_info(self._db._get_replica_uid()) # compare source generation with target's last known source generation if self._db._get_generation() != info[4]: @@ -1087,6 +1115,13 @@ class Soledad(object): return True return False + @property + def syncing(self): + """ + Property, True if the syncer is syncing. + """ + return self._db.syncing + def _set_token(self, token): """ Set the authentication token for remote database access. diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index a6372107..7133f804 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # crypto.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,27 +14,45 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Cryptographic utilities for Soledad. """ - - import os import binascii import hmac import hashlib - +import json +import logging +import multiprocessing +import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument + + +from leap.soledad.common.crypto import ( + EncryptionSchemes, + UnknownEncryptionScheme, + MacMethods, + UnknownMacMethod, + WrongMac, + ENC_JSON_KEY, + ENC_SCHEME_KEY, + ENC_METHOD_KEY, + ENC_IV_KEY, + MAC_KEY, + MAC_METHOD_KEY, +) +logger = logging.getLogger(__name__) -from leap.soledad.common import ( - soledad_assert, - soledad_assert_type, -) + +MAC_KEY_LENGTH = 64 class EncryptionMethods(object): @@ -45,6 +63,17 @@ class EncryptionMethods(object): AES_256_CTR = 'aes-256-ctr' XSALSA20 = 'xsalsa20' +# +# Exceptions +# + + +class DocumentNotEncrypted(Exception): + """ + Raised for failures in document encryption. + """ + pass + class UnknownEncryptionMethod(Exception): """ @@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception): """ -class SoledadCrypto(object): +def encrypt_sym(data, key, method): """ - General cryptographic functionality. + Encrypt C{data} using a {password}. + + Currently, the only encryption methods supported are AES-256 in CTR + mode and XSalsa20. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + + :return: A tuple with the initial value and the encrypted data. + :rtype: (long, str) + """ + soledad_assert_type(key, str) + + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s bits (must be 256 bits long).' % + (len(key) * 8)) + iv = None + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + iv = os.urandom(16) + ciphertext = AES(key=key, iv=iv).process(data) + # XSalsa20 + elif method == EncryptionMethods.XSALSA20: + iv = os.urandom(24) + ciphertext = XSalsa20(key=key, iv=iv).process(data) + else: + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, method, **kwargs): """ + Decrypt data using symmetric secret. + + Currently, the only encryption method supported is AES-256 CTR mode. - MAC_KEY_LENGTH = 64 + :param data: The data to be decrypted. + :type data: str + :param key: The key used to decrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + :param kwargs: Other parameters specific to each encryption method. + :type kwargs: dict + + :return: The decrypted data. + :rtype: str + """ + soledad_assert_type(key, str) + # assert params + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s (must be 256 bits long).' % len(key)) + soledad_assert( + 'iv' in kwargs, + '%s needs an initial value.' % method) + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + return AES( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + elif method == EncryptionMethods.XSALSA20: + return XSalsa20( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + + +def doc_mac_key(doc_id, secret): + """ + Generate a key for calculating a MAC for a document whose id is + C{doc_id}. + The key is derived using HMAC having sha256 as underlying hash + function. The key used for HMAC is the first MAC_KEY_LENGTH characters + of Soledad's storage secret. The HMAC message is C{doc_id}. + + :param doc_id: The id of the document. + :type doc_id: str + + :param secret: soledad secret storage + :type secret: Soledad.storage_secret + + :return: The key. + :rtype: str + + :raise NoSymmetricSecret: if no symmetric secret was supplied. + """ + if secret is None: + raise NoSymmetricSecret() + + return hmac.new( + secret[:MAC_KEY_LENGTH], + doc_id, + hashlib.sha256).digest() + + +class SoledadCrypto(object): + """ + General cryptographic functionality encapsulated in a + object that can be passed along. + """ def __init__(self, soledad): """ Initialize the crypto object. @@ -77,78 +209,14 @@ class SoledadCrypto(object): def encrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR): - """ - Encrypt C{data} using a {password}. - - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be encrypted. - :type data: str - :param key: The key used to encrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - - :return: A tuple with the initial value and the encrypted data. - :rtype: (long, str) - """ - soledad_assert_type(key, str) - - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s bits (must be 256 bits long).' % - (len(key) * 8)) - iv = None - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - iv = os.urandom(16) - ciphertext = AES(key=key, iv=iv).process(data) - # XSalsa20 - elif method == EncryptionMethods.XSALSA20: - iv = os.urandom(24) - ciphertext = XSalsa20(key=key, iv=iv).process(data) - else: - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) - return binascii.b2a_base64(iv), ciphertext + return encrypt_sym(data, key, method) def decrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR, **kwargs): - """ - Decrypt data using symmetric secret. + return decrypt_sym(data, key, method, **kwargs) - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be decrypted. - :type data: str - :param key: The key used to decrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - :param kwargs: Other parameters specific to each encryption method. - :type kwargs: dict - - :return: The decrypted data. - :rtype: str - """ - soledad_assert_type(key, str) - # assert params - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s (must be 256 bits long).' % len(key)) - soledad_assert( - 'iv' in kwargs, - '%s needs an initial value.' % method) - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - return AES( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - elif method == EncryptionMethods.XSALSA20: - return XSalsa20( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + def doc_mac_key(self, doc_id, secret): + return doc_mac_key(doc_id, self.secret) def doc_passphrase(self, doc_id): """ @@ -173,41 +241,769 @@ class SoledadCrypto(object): raise NoSymmetricSecret() return hmac.new( self.secret[ - self.MAC_KEY_LENGTH: + MAC_KEY_LENGTH: self._soledad.REMOTE_STORAGE_SECRET_LENGTH], doc_id, hashlib.sha256).digest() - def doc_mac_key(self, doc_id): + # + # secret setters/getters + # + + def _get_secret(self): + return self._soledad.storage_secret + + secret = property( + _get_secret, doc='The secret used for symmetric encryption') + +# +# Crypto utilities for a SoledadDocument. +# + + +def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): + """ + Calculate a MAC for C{doc} using C{ciphertext}. + + Current MAC method used is HMAC, with the following parameters: + + * key: sha256(storage_secret, doc_id) + * msg: doc_id + doc_rev + ciphertext + * digestmod: sha256 + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: soledad secret + :type secret: Soledad.secret_storage + + :return: The calculated MAC. + :rtype: str + """ + if mac_method == MacMethods.HMAC: + return hmac.new( + doc_mac_key(doc_id, secret), + str(doc_id) + str(doc_rev) + ciphertext, + hashlib.sha256).digest() + # raise if we do not know how to handle this MAC method + raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + +def encrypt_doc(crypto, doc): + """ + Wrapper around encrypt_docstr that accepts a crypto object and the document + as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + + return encrypt_docstr( + doc.get_json(), doc.doc_id, doc.rev, key, secret) + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): + """ + Encrypt C{doc}'s content. + + Encrypt doc's contents using AES-256 CTR mode and return a valid JSON + string representing the following: + + { + ENC_JSON_KEY: '<encrypted doc JSON string>', + ENC_SCHEME_KEY: 'symkey', + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: '<the initial value used to encrypt>', + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + :param docstr: A representation of the document to be encrypted. + :type docstr: str or unicode. + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: The JSON serialization of the dict representing the encrypted + content. + :rtype: str + """ + # encrypt content using AES-256 CTR mode + iv, ciphertext = encrypt_sym( + str(docstr), # encryption/decryption routines expect str + key, method=EncryptionMethods.AES_256_CTR) + # Return a representation for the encrypted content. In the following, we + # convert binary data to hexadecimal representation so the JSON + # serialization does not complain about what it tries to serialize. + hex_ciphertext = binascii.b2a_hex(ciphertext) + return json.dumps({ + ENC_JSON_KEY: hex_ciphertext, + ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: iv, + MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. + doc_id, doc_rev, ciphertext, + MacMethods.HMAC, secret)), + MAC_METHOD_KEY: MacMethods.HMAC, + }) + + +def decrypt_doc(crypto, doc): + """ + Wrapper around decrypt_doc_dict that accepts a crypto object and the + document as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + + :return: json string with the decrypted document + :rtype: str + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): + """ + Decrypt C{doc}'s content. + + Return the JSON string representation of the document's decrypted content. + + The passed doc_dict argument should have the following structure: + + { + ENC_JSON_KEY: '<enc_blob>', + ENC_SCHEME_KEY: '<enc_scheme>', + ENC_METHOD_KEY: '<enc_method>', + ENC_IV_KEY: '<initial value used to encrypt>', # (optional) + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + C{enc_blob} is the encryption of the JSON serialization of the document's + content. For now Soledad just deals with documents whose C{enc_scheme} is + EncryptionSchemes.SYMKEY and C{enc_method} is + EncryptionMethods.AES_256_CTR. + + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: + :type secret: + + :return: The JSON serialization of the decrypted content. + :rtype: str + """ + soledad_assert(ENC_JSON_KEY in doc_dict) + soledad_assert(ENC_SCHEME_KEY in doc_dict) + soledad_assert(ENC_METHOD_KEY in doc_dict) + soledad_assert(MAC_KEY in doc_dict) + soledad_assert(MAC_METHOD_KEY in doc_dict) + + # verify MAC + ciphertext = binascii.a2b_hex( # content is stored as hex. + doc_dict[ENC_JSON_KEY]) + mac = mac_doc( + doc_id, doc_rev, + ciphertext, + doc_dict[MAC_METHOD_KEY], secret) + # we compare mac's hashes to avoid possible timing attacks that might + # exploit python's builtin comparison operator behaviour, which fails + # immediatelly when non-matching bytes are found. + doc_mac_hash = hashlib.sha256( + binascii.a2b_hex( # the mac is stored as hex + doc_dict[MAC_KEY])).digest() + calculated_mac_hash = hashlib.sha256(mac).digest() + + if doc_mac_hash != calculated_mac_hash: + logger.warning("Wrong MAC while decrypting doc...") + raise WrongMac('Could not authenticate document\'s contents.') + # decrypt doc's content + enc_scheme = doc_dict[ENC_SCHEME_KEY] + plainjson = None + if enc_scheme == EncryptionSchemes.SYMKEY: + enc_method = doc_dict[ENC_METHOD_KEY] + if enc_method == EncryptionMethods.AES_256_CTR: + soledad_assert(ENC_IV_KEY in doc_dict) + plainjson = decrypt_sym( + ciphertext, key, + method=enc_method, + iv=doc_dict[ENC_IV_KEY]) + else: + raise UnknownEncryptionMethod(enc_method) + else: + raise UnknownEncryptionScheme(enc_scheme) + + return plainjson + + +def is_symmetrically_encrypted(doc): + """ + Return True if the document was symmetrically encrypted. + + :param doc: The document to check. + :type doc: SoledadDocument + + :rtype: bool + """ + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: + return True + return False + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + WORKERS = 5 + + def __init__(self, crypto, sync_db, write_lock): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: a database connection handle + :type sync_db: handle + + :param write_lock: a write lock for controlling concurrent access + to the sync_db + :type write_lock: threading.Lock + """ + self._pool = multiprocessing.Pool(self.WORKERS) + self._crypto = crypto + self._sync_db = sync_db + self._sync_db_write_lock = write_lock + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + # TODO implement throttling to reduce cpu usage?? + WORKERS = 5 + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id, rev, content" + + def encrypt_doc(self, doc, workers=True): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + + try: + if workers: + res = self._pool.apply_async( + encrypt_doc_task, args, + callback=self.encrypt_doc_cb) + else: + # encrypt inline + res = encrypt_doc_task(*args) + self.encrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def encrypt_doc_cb(self, result): """ - Generate a key for calculating a MAC for a document whose id is - C{doc_id}. + Insert results of encryption routine into the local sync database. - The key is derived using HMAC having sha256 as underlying hash - function. The key used for HMAC is the first MAC_KEY_LENGTH characters - of Soledad's storage secret. The HMAC message is C{doc_id}. + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + self.insert_encrypted_local_doc(doc_id, doc_rev, content) - :param doc_id: The id of the document. + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param content: The encrypted document. + :type content: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - :return: The key. - :rtype: str + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, )) + con.execute(sql_ins, (doc_id, doc_rev, content)) - :raise NoSymmetricSecret: if no symmetric secret was supplied. + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): + """ + Return a list of documents ready to be inserted. This list is computed + by aligning the expected list with the already gotten docs, and returning + the maximum number of docs that can be processed in the expected order + before finding a gap. + + :param expected: A list of generations to be inserted. + :type expected: list + + :param got: A dictionary whose values are the docs to be inserted. + :type got: dict + """ + ordered = [got.get(i) for i in expected] + if None in ordered: + return ordered[:ordered.index(None)] + else: + return ordered + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. All the encrypted docs are collected, together with their generation + and transaction-id + 2. The docs are enqueued for decryption. When completed, they are + inserted following the generation order. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id, rev, content, gen, trans_id" + + write_encrypted_lock = threading.Lock() + + def __init__(self, *args, **kwargs): """ - if self.secret is None: - raise NoSymmetricSecret() - return hmac.new( - self.secret[:self.MAC_KEY_LENGTH], - doc_id, - hashlib.sha256).digest() + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + self.decrypted_docs = {} + self.source_replica_uid = None - # - # secret setters/getters - # + def set_source_replica_uid(self, source_replica_uid): + """ + Set the source replica uid for this decrypter pool instance. - def _get_secret(self): - return self._soledad.storage_secret + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + """ + self.source_replica_uid = source_replica_uid - secret = property( - _get_secret, doc='The secret used for symmetric encryption') + def insert_encrypted_received_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + docstr = json.dumps(content) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + + def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): + """ + Insert a marker with the document id, revision and generation on the + sync db. This document does not have an encrypted payload, so the + content has already been inserted into the decrypted_docs dictionary + from where it can be picked following generation order. + We need to leave here the marker to be able to calculate the expected + insertion order for a synchronization batch. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param gen: the Document Generation + :type gen: int + """ + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) + + def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + # XXX this need a deeper review / testing. + # I believe that what I'm doing here is prone to problems + # if the sync is interrupted (ie, client crash) in the worst possible + # moment. We would need a recover strategy in that case + # (or, insert the document in the table all the same, but with a flag + # saying if the document is sym-encrypted or not), + content = json.dumps(content) + result = doc_id, doc_rev, content, gen, trans_id + self.decrypted_docs[gen] = result + self.insert_marker_for_received_doc(doc_id, doc_rev, gen) + + def delete_encrypted_received_doc(self, doc_id, doc_rev): + """ + Delete a encrypted received doc after it was inserted into the local + db. + + :param doc_id: Document ID. + :type doc_id: str + :param doc_rev: Document revision. + :type doc_rev: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, doc_rev)) + + def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): + """ + Symmetrically decrypt a document. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param source_replica_uid: + :type source_replica_uid: str + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + self.source_replica_uid = source_replica_uid + + # insert_doc_cb is a proxy object that gets updated with the right + # insert function only when the sync_target invokes the sync_exchange + # method. so, if we don't still have a non-empty callback, we refuse + # to proceed. + if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), + None): + logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") + return + + # XXX move to get_doc function... + c = self._sync_db.cursor() + sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + try: + c.execute(sql, (doc_id, rev)) + res = c.fetchone() + except Exception as exc: + logger.warning("Error getting docs from syncdb: %r" % (exc,)) + return + if res is None: + logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) + return + + soledad_assert(self._crypto is not None, "need a crypto object") + try: + doc_id, rev, docstr, gen, trans_id = res + except ValueError: + logger.warning("Wrong entry in sync db") + return + + if len(docstr) == 0: + # not encrypted payload + return + + try: + content = json.loads(docstr) + except TypeError: + logger.warning("Wrong type while decoding json: %s" % repr(docstr)) + return + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret + + try: + if workers: + # Ouch. This is sent to the workers asynchronously, so + # we have no way of logging errors. We'd have to inspect + # lingering results by querying successful / get() over them... + # Or move the heck out of it to twisted. + res = self._pool.apply_async( + decrypt_doc_task, args, + callback=self.decrypt_doc_cb) + else: + # decrypt inline + res = decrypt_doc_task(*args) + self.decrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def decrypt_doc_cb(self, result): + """ + Temporarily store the decryption result in a dictionary where it will + be picked by process_decrypted. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, rev, content, gen, trans_id = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) + self.decrypted_docs[gen] = result + + def get_docs_by_generation(self): + """ + Get all documents in the received table from the sync db, + ordered by generation. + + :return: list of doc_id, rev, generation + """ + c = self._sync_db.cursor() + sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( + self.TABLE_NAME,) + c.execute(sql) + return c.fetchall() + + def count_received_encrypted_docs(self): + """ + Count how many documents we have in the table for received and + encrypted docs. + + :return: The count of documents. + :rtype: int + """ + if self._sync_db is None: + logger.warning("cannot return count with null sync_db") + return + c = self._sync_db.cursor() + sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + c.execute(sql) + res = c.fetchone() + if res is not None: + return res[0] + else: + return 0 + + def decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + """ + docs_by_generation = self.get_docs_by_generation() + logger.debug("Sync decrypter pool: There are %d documents to " \ + "decrypt." % len(docs_by_generation)) + for doc_id, rev, gen in filter(None, docs_by_generation): + self.decrypt_doc(doc_id, rev, self.source_replica_uid) + + def process_decrypted(self): + """ + Process the already decrypted documents, and insert as many documents + as can be taken from the expected order without finding a gap. + + :return: Whether we have processed all the pending docs. + :rtype: bool + """ + # Acquire the lock to avoid processing while we're still + # getting data from the syncing stream, to avoid InvalidGeneration + # problems. + with self.write_encrypted_lock: + already_decrypted = self.decrypted_docs + docs = self.get_docs_by_generation() + docs = filter(lambda entry: len(entry) > 0, docs) + expected = [gen for doc_id, rev, gen in docs] + docs_to_insert = get_insertable_docs_by_gen( + expected, already_decrypted) + for doc_fields in docs_to_insert: + self.insert_decrypted_local_doc(*doc_fields) + remaining = self.count_received_encrypted_docs() + return remaining == 0 + + def insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + logger.debug("Sync decrypter pool: inserting doc in local db: " \ + "%s:%s %s" % (doc_id, doc_rev, gen)) + try: + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + insert_fun(doc, int(gen), trans_id) + except Exception as exc: + logger.error("Sync decrypter pool: error while inserting " + "decrypted doc into local db.") + logger.exception(exc) + + else: + # If no errors found, remove it from the local temporary dict + # and from the received database. + self.decrypted_docs.pop(gen) + self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..2df9606e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC when opening a database. So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging +import multiprocessing import os +import sqlite3 import string import threading import time @@ -57,9 +57,12 @@ from collections import defaultdict from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None def open(path, password, create=True, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """Open a database at the given location. Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None, :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: An instance of Database. :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None, return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, - cipher_page_size=cipher_page_size) + cipher_page_size=cipher_page_size, defer_encryption=defer_encryption) # @@ -147,19 +153,40 @@ class NotAnHexString(Exception): # class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): - """A U1DB implementation that uses SQLCipher as its persistence layer.""" + """ + A U1DB implementation that uses SQLCipher as its persistence layer. + """ + defer_encryption = False _index_storage_value = 'expand referenced encrypted' k_lock = threading.Lock() create_doc_lock = threading.Lock() update_indexes_lock = threading.Lock() + _sync_watcher = None + _sync_enc_pool = None + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - syncing_lock = defaultdict(threading.Lock) """ A dictionary that hold locks which avoid multiple sync attempts from the same database replica. """ + encrypting_lock = threading.Lock() + """ + Period or recurrence of the periodic encrypting task, in seconds. + """ + ENCRYPT_TASK_PERIOD = 1 + + syncing_lock = defaultdict(threading.Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.assert_db_is_encrypted( sqlcipher_file, password, raw_key, cipher, kdf_iter, cipher_page_size) - # connect to the database + + # connect to the sqlcipher database with self.k_lock: self._db_handle = dbapi2.connect( sqlcipher_file, @@ -215,18 +243,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._ensure_schema() self._crypto = crypto + self._sync_db = None + self._sync_db_write_lock = None + self._sync_enc_pool = None + + if self.defer_encryption: + if sqlcipher_file != ":memory:": + self._sync_db_path = "%s-sync" % sqlcipher_file + else: + self._sync_db_path = ":memory:" + + # initialize sync db + self._init_sync_db() + + # initialize syncing queue encryption pool + self._sync_enc_pool = SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + self._sync_watcher = TimerTask(self._encrypt_syncing_docs, + self.ENCRYPT_TASK_PERIOD) + self._sync_watcher.start() + def factory(doc_id=None, rev=None, json='{}', has_conflicts=False, syncable=True): return SoledadDocument(doc_id=doc_id, rev=rev, json=json, has_conflicts=has_conflicts, syncable=syncable) self.set_document_factory(factory) + # we store syncers in a dictionary indexed by the target URL. We also + # store a hash of the auth info in case auth info expires and we need + # to rebuild the syncer for that target. The final self._syncers + # format is the following: + # + # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', - kdf_iter=4000, cipher_page_size=1024): + kdf_iter=4000, cipher_page_size=1024, + defer_encryption=False): """ Open a SQLCipher database. @@ -249,10 +304,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption if not os.path.isfile(sqlcipher_file): raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +357,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): def open_database(cls, sqlcipher_file, password, create, backend_cls=None, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """ Open a SQLCipher database. :param sqlcipher_file: The path for the SQLCipher file. :type sqlcipher_file: str + :param password: The password that protects the SQLCipher db. :type password: str + :param create: Should the datbase be created if it does not already - exist? - :type: bool + exist? + :type create: bool + :param backend_cls: A class to use as backend. :type backend_cls: type + :param document_factory: A function that will be called with the same - parameters as Document.__init__. + parameters as Document.__init__. :type document_factory: callable + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. + document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param raw_key: Whether C{password} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. + passphrase that should be hashed to obtain the + encyrption key. :type raw_key: bool + :param cipher: The cipher and mode to use. :type cipher: str + :param kdf_iter: The number of iterations to use. :type kdf_iter: int + :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption try: return cls._open_database( sqlcipher_file, password, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, - kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) + kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, + defer_encryption=defer_encryption) except u1db_errors.DatabaseDoesNotExist: if not create: raise @@ -347,7 +422,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) - def sync(self, url, creds=None, autocreate=True): + def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -362,6 +437,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :param autocreate: Ask the target to create the db if non-existent. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool :return: The local generation before the synchronisation was performed. :rtype: int @@ -370,7 +449,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # the following context manager blocks until the syncing lock can be # acquired. with self.syncer(url, creds=creds) as syncer: - res = syncer.sync(autocreate=autocreate) + + # XXX could mark the critical section here... + try: + res = syncer.sync(autocreate=autocreate, + defer_decryption=defer_decryption) + + except PendingReceivedDocsSyncError: + logger.warning("Local sync db is not clear, skipping sync...") + return + return res def stop_sync(self): @@ -394,7 +482,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: syncer = self._get_syncer(url, creds=creds) yield syncer - syncer.sync_target.close() + + @property + def syncing(self): + lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()] + acquired_lock = lock.acquire(False) + if acquired_lock is False: + return True + lock.release() + return False def _get_syncer(self, url, creds=None): """ @@ -415,11 +511,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - syncer = Synchronizer( + wlock = self._sync_db_write_lock + syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, + self._replica_uid, creds=creds, - crypto=self._crypto)) + crypto=self._crypto, + sync_db=self._sync_db, + sync_db_write_lock=wlock)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -442,21 +542,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') - def create_doc(self, content, doc_id=None): + def _init_sync_db(self): + """ + Initialize the Symmetrically-Encrypted document to be synced database, + and the queue to communicate with subprocess workers. """ - Create a new document in the local encrypted database. + self._sync_db = sqlite3.connect(self._sync_db_path, + check_same_thread=False) - :param content: the contents of the new document - :type content: dict - :param doc_id: an optional identifier specifying the document id - :type doc_id: str + self._sync_db_write_lock = threading.Lock() + self._create_sync_db_tables() + self.sync_queue = multiprocessing.Queue() + + def _create_sync_db_tables(self): + """ + Create tables for the local sync documents db if needed. + """ + encr = SyncEncrypterPool + decr = SyncDecrypterPool + sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + encr.TABLE_NAME, encr.FIELD_NAMES)) + sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + decr.TABLE_NAME, decr.FIELD_NAMES)) + + with self._sync_db_write_lock: + with self._sync_db: + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) + + # + # Symmetric encryption of syncing docs + # + + def _encrypt_syncing_docs(self): + """ + Process the syncing queue and send the documents there + to be encrypted in the sync db. They will be read by the + SoledadSyncTarget during the sync_exchange. + + Called periodical from the TimerTask self._sync_watcher. + """ + lock = self.encrypting_lock + # optional wait flag used to avoid blocking + if not lock.acquire(False): + return + else: + queue = self.sync_queue + try: + while not queue.empty(): + doc = queue.get_nowait() + self._sync_enc_pool.encrypt_doc(doc) + + except Exception as exc: + logger.error("Error while encrypting docs to sync") + logger.exception(exc) + finally: + lock.release() + + # + # Document operations + # - :return: the new document - :rtype: SoledadDocument + def put_doc(self, doc): """ - with self.create_doc_lock: - return sqlite_backend.SQLitePartialExpandDatabase.create_doc( - self, content, doc_id=doc_id) + Overwrite the put_doc method, to enqueue the modified document for + encryption before sync. + + :param doc: The document to be put. + :type doc: u1db.Document + + :return: The new document revision. + :rtype: str + """ + doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( + self, doc) + if self.defer_encryption: + self.sync_queue.put_nowait(doc) + return doc_rev + + # indexes def _put_and_update_indexes(self, old_doc, doc): """ @@ -906,12 +1070,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = c.fetchall() return res[0][0] - def __del__(self): + def close(self): """ - Closes db_handle upon object destruction. + Close db_handle and close syncer. """ + logger.debug("Sqlcipher backend: closing") + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.close() + if self._sync_enc_pool is not None: + self._sync_enc_pool.close() if self._db_handle is not None: self._db_handle.close() + @property + def replica_uid(self): + return self._get_replica_uid() + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,85 @@ """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + + * Defer the update of the known replica uid until all the decryption of + the incoming messages has been processed. + + * Be interrupted and recovered. """ + import json +import logging +import traceback +from threading import Lock from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. - Modified to allow for interrupting the synchronization process. + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. """ + syncing_lock = Lock() + def stop(self): """ Stop the current sync in progress. """ self.sync_target.stop() - def sync(self, autocreate=False): + def sync(self, autocreate=False, defer_decryption=True): """ Synchronize documents between source and target. + Differently from u1db `Synchronizer.sync` method, this one allows to + pass a `defer_decryption` flag that will postpone the last + step in the synchronization dance, namely, the setting of the last + known generation and transaction id for a given remote replica. + + This is done to allow the ongoing parallel decryption of the incoming + docs to proceed without `InvalidGeneration` conflicts. + :param autocreate: Whether the target replica should be created or not. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + """ + self.syncing_lock.acquire() + try: + return self._sync(autocreate=autocreate, + defer_decryption=defer_decryption) + except Exception: + # re-raising the exceptions to let syqlcipher.sync catch them + # (and re-create the syncer instance if needed) + raise + finally: + self.release_syncing_lock() + + def _sync(self, autocreate=False, defer_decryption=True): + """ + Helper function, called from the main `sync` method. + See `sync` docstring. """ sync_target = self.sync_target @@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer): target_gen, target_trans_id = (0, '') target_my_gen, target_my_trans_id = (0, '') + logger.debug( + "Soledad target sync info:\n" + " target replica uid: %s\n" + " target generation: %d\n" + " target trans id: %s\n" + " target my gen: %d\n" + " target my trans_id: %s" + % (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id)) + # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer): # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("Soledad sync: there are %d documents to send." \ + % len(changes)) # get source last-seen database generation for the target if self.target_replica_uid is None: @@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer): target_last_known_gen, target_last_known_trans_id = \ self.source._get_replica_gen_and_trans_id( self.target_replica_uid) + logger.debug( + "Soledad source sync info:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: @@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer): # # The sync_exchange method may be interrupted, in which case it will # return a tuple of Nones. - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback) + try: + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + if defer_decryption and not sync_target.has_syncdb(): + logger.debug("Sync target has no valid sync db, " + "aborting defer_decryption") + defer_decryption = False + self.complete_sync() + except Exception as e: + logger.error("Soledad sync error: %s" % str(e)) + logger.error(traceback.format_exc()) + sync_target.stop() + finally: + sync_target.close() - # record target synced-up-to generation including applying what we sent + return my_gen + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + """ + logger.debug("Completing deferred last step in SYNC...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info self.source._set_replica_gen_and_trans_id( - self.target_replica_uid, new_gen, new_trans_id) + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + # if gapless record current reached generation with target - self._record_sync_info_with_the_target(my_gen) + self._record_sync_info_with_the_target(info["my_gen"]) - return my_gen + @property + def syncing(self): + """ + Return True if a sync is ongoing, False otherwise. + :rtype: bool + """ + # XXX FIXME we need some mechanism for timeout: should cleanup and + # release if something in the syncdb-decrypt goes wrong. we could keep + # track of the release date and cleanup unrealistic sync entries after + # some time. + locked = self.syncing_lock.locked() + return locked + + def release_syncing_lock(self): + """ + Release syncing lock if it's locked. + """ + if self.syncing_lock.locked(): + self.syncing_lock.release() + + def close(self): + """ + Close sync target pool of workers. + """ + self.release_syncing_lock() + self.sync_target.close() + + def __del__(self): + """ + Cleanup: release lock. + """ + self.release_syncing_lock() diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 968545b6..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,458 +14,466 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + + """ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ -import binascii + + import cStringIO import gzip -import hashlib -import hmac import logging +import re import urllib import threading +import urlparse -import simplejson as json +from collections import defaultdict from time import sleep from uuid import uuid4 +from contextlib import contextmanager -from u1db.remote import utils, http_errors -from u1db.errors import BrokenSyncStream +import simplejson as json +from taskthread import TimerTask from u1db import errors +from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter - +from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject from leap.soledad.common import soledad_assert -from leap.soledad.common.crypto import ( - EncryptionSchemes, - UnknownEncryptionScheme, - MacMethods, - UnknownMacMethod, - WrongMac, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - ENC_METHOD_KEY, - ENC_IV_KEY, - MAC_KEY, - MAC_METHOD_KEY, -) from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import ( - EncryptionMethods, - UnknownEncryptionMethod, -) -from leap.soledad.client.events import ( - SOLEDAD_SYNC_SEND_STATUS, - SOLEDAD_SYNC_RECEIVE_STATUS, - signal, -) +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc, decrypt_doc +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal -logger = logging.getLogger(__name__) -# -# Exceptions -# +logger = logging.getLogger(__name__) -class DocumentNotEncrypted(Exception): +def _gunzip(data): """ - Raised for failures in document encryption. + Uncompress data that is gzipped. + + :param data: gzipped data + :type data: basestring """ - pass + buffer = cStringIO.StringIO() + buffer.write(data) + buffer.seek(0) + try: + data = gzip.GzipFile(mode='r', fileobj=buffer).read() + except Exception: + logger.warning("Error while decrypting gzipped data") + buffer.close() + return data -# -# Crypto utilities for a SoledadDocument. -# +class PendingReceivedDocsSyncError(Exception): + pass -def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method): +class DocumentSyncerThread(threading.Thread): """ - Calculate a MAC for C{doc} using C{ciphertext}. - - Current MAC method used is HMAC, with the following parameters: - - * key: sha256(storage_secret, doc_id) - * msg: doc_id + doc_rev + ciphertext - * digestmod: sha256 - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - :param ciphertext: The content of the document. - :type ciphertext: str - :param mac_method: The MAC method to use. - :type mac_method: str - - :return: The calculated MAC. - :rtype: str + A thread that knowns how to either send or receive a document during the + sync process. """ - if mac_method == MacMethods.HMAC: - return hmac.new( - crypto.doc_mac_key(doc_id), - str(doc_id) + str(doc_rev) + ciphertext, - hashlib.sha256).digest() - # raise if we do not know how to handle this MAC method - raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + def __init__(self, doc_syncer, release_method, failed_method, + idx, total, last_request_lock=None, last_callback_lock=None): + """ + Initialize a new syncer thread. + + :param doc_syncer: A document syncer. + :type doc_syncer: HTTPDocumentSyncer + :param release_method: A method to be called when finished running. + :type release_method: callable(DocumentSyncerThread) + :param failed_method: A method to be called when we failed. + :type failed_method: callable(DocumentSyncerThread) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + threading.Thread.__init__(self) + self._doc_syncer = doc_syncer + self._release_method = release_method + self._failed_method = failed_method + self._idx = idx + self._total = total + self._last_request_lock = last_request_lock + self._last_callback_lock = last_callback_lock + self._response = None + self._exception = None + self._result = None + self._success = False + # a lock so we can signal when we're finished + self._request_lock = threading.Lock() + self._request_lock.acquire() + self._callback_lock = threading.Lock() + self._callback_lock.acquire() + # make thread interruptable + self._stopped = None + self._stop_lock = threading.Lock() -def encrypt_doc(crypto, doc): - """ - Encrypt C{doc}'s content. - - Encrypt doc's contents using AES-256 CTR mode and return a valid JSON - string representing the following: - - { - ENC_JSON_KEY: '<encrypted doc JSON string>', - ENC_SCHEME_KEY: 'symkey', - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: '<the initial value used to encrypt>', - MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' - } - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :return: The JSON serialization of the dict representing the encrypted - content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - # encrypt content using AES-256 CTR mode - iv, ciphertext = crypto.encrypt_sym( - str(doc.get_json()), # encryption/decryption routines expect str - crypto.doc_passphrase(doc.doc_id), - method=EncryptionMethods.AES_256_CTR) - # Return a representation for the encrypted content. In the following, we - # convert binary data to hexadecimal representation so the JSON - # serialization does not complain about what it tries to serialize. - hex_ciphertext = binascii.b2a_hex(ciphertext) - return json.dumps({ - ENC_JSON_KEY: hex_ciphertext, - ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: iv, - # store the mac as hex. - MAC_KEY: binascii.b2a_hex( - mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), - MAC_METHOD_KEY: MacMethods.HMAC, - }) - - -def decrypt_doc(crypto, doc): - """ - Decrypt C{doc}'s content. + def run(self): + """ + Run the HTTP request and store results. - Return the JSON string representation of the document's decrypted content. + This method will block and wait for an eventual previous operation to + finish before actually performing the request. It also traps any + exception and register any failure with the request. + """ + with self._stop_lock: + if self._stopped is None: + self._stopped = False + else: + return + + # eventually wait for the previous thread to finish + if self._last_request_lock is not None: + self._last_request_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + try: + self._response = self._doc_syncer.do_request() + self._request_lock.release() + + # run success callback + if self._doc_syncer.success_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self._stopped is True: + return + + self._result = self._doc_syncer.success_callback( + self._idx, self._total, self._response) + self._success = True + doc_syncer = self._doc_syncer + self._release_method(self, doc_syncer) + self._doc_syncer = None + # let next thread executed its callback + self._callback_lock.release() + + # trap any exception and signal failure + except Exception as e: + self._exception = e + self._success = False + # run failure callback + if self._doc_syncer.failure_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + self._doc_syncer.failure_callback( + self._idx, self._total, self._exception) + + self._failed_method(self) + # we do not release the callback lock here because we + # failed and so we don't want other threads to succeed. - The content of the document should have the following structure: + @property + def doc_syncer(self): + return self._doc_syncer - { - ENC_JSON_KEY: '<enc_blob>', - ENC_SCHEME_KEY: '<enc_scheme>', - ENC_METHOD_KEY: '<enc_method>', - ENC_IV_KEY: '<initial value used to encrypt>', # (optional) - MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' - } + @property + def response(self): + return self._response - C{enc_blob} is the encryption of the JSON serialization of the document's - content. For now Soledad just deals with documents whose C{enc_scheme} is - EncryptionSchemes.SYMKEY and C{enc_method} is - EncryptionMethods.AES_256_CTR. + @property + def exception(self): + return self._exception - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document to be decrypted. - :type doc: SoledadDocument + @property + def callback_lock(self): + return self._callback_lock - :return: The JSON serialization of the decrypted content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - soledad_assert(ENC_JSON_KEY in doc.content) - soledad_assert(ENC_SCHEME_KEY in doc.content) - soledad_assert(ENC_METHOD_KEY in doc.content) - soledad_assert(MAC_KEY in doc.content) - soledad_assert(MAC_METHOD_KEY in doc.content) - # verify MAC - ciphertext = binascii.a2b_hex( # content is stored as hex. - doc.content[ENC_JSON_KEY]) - mac = mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - doc.content[MAC_METHOD_KEY]) - # we compare mac's hashes to avoid possible timing attacks that might - # exploit python's builtin comparison operator behaviour, which fails - # immediatelly when non-matching bytes are found. - doc_mac_hash = hashlib.sha256( - binascii.a2b_hex( # the mac is stored as hex - doc.content[MAC_KEY])).digest() - calculated_mac_hash = hashlib.sha256(mac).digest() - if doc_mac_hash != calculated_mac_hash: - raise WrongMac('Could not authenticate document\'s contents.') - # decrypt doc's content - enc_scheme = doc.content[ENC_SCHEME_KEY] - plainjson = None - if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc.content[ENC_METHOD_KEY] - if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc.content) - plainjson = crypto.decrypt_sym( - ciphertext, - crypto.doc_passphrase(doc.doc_id), - method=enc_method, - iv=doc.content[ENC_IV_KEY]) - else: - raise UnknownEncryptionMethod(enc_method) - else: - raise UnknownEncryptionScheme(enc_scheme) - return plainjson + @property + def request_lock(self): + return self._request_lock + @property + def success(self): + return self._success -def _gunzip(data): - """ - Uncompress data that is gzipped. + def stop(self): + with self._stop_lock: + self._stopped = True - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data + @property + def stopped(self): + with self._stop_lock: + return self._stopped + @property + def result(self): + return self._result -# -# SoledadSyncTarget -# -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +class DocumentSyncerPool(object): """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. + A pool of reusable document syncers. """ - # - # Token auth methods. - # + POOL_SIZE = 10 + """ + The maximum amount of syncer threads running at the same time. + """ - def set_token_credentials(self, uuid, token): + def __init__(self, raw_url, raw_creds, query_string, headers, + ensure_callback, stop_method): """ - Store given credentials so we can sign the request later. + Initialize the document syncer pool. + + :param raw_url: The complete raw URL for the HTTP request. + :type raw_url: str + :param raw_creds: The credentials for the HTTP request. + :type raw_creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. + :type headers: dict + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _sign_request(self, method, url_query, params): + # save syncer params + self._raw_url = raw_url + self._raw_creds = raw_creds + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + self._stop_method = stop_method + # pool attributes + self._failures = False + self._semaphore_pool = threading.BoundedSemaphore( + DocumentSyncerPool.POOL_SIZE) + self._pool_access_lock = threading.Lock() + self._doc_syncers = [] + self._threads = [] + + def new_syncer_thread(self, idx, total, last_request_lock=None, + last_callback_lock=None): """ - Return an authorization header to be included in the HTTP request. + Yield a new document syncer thread. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + t = None + # wait for available threads + self._semaphore_pool.acquire() + with self._pool_access_lock: + if self._failures is True: + return None + # get a syncer + doc_syncer = self._get_syncer() + # we rely on DocumentSyncerThread.run() to release the lock using + # self.release_syncer so we can launch a new thread. + t = DocumentSyncerThread( + doc_syncer, self.release_syncer, self.cancel_threads, + idx, total, + last_request_lock=last_request_lock, + last_callback_lock=last_callback_lock) + self._threads.append(t) + return t + + def _failed(self): + with self._pool_access_lock: + self._failures = True - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list + @property + def failures(self): + return self._failures - :return: The Authorization header. - :rtype: list of tuple + def _get_syncer(self): """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - # - # Modified HTTPSyncTarget methods. - # + Get a document syncer from the pool. - @staticmethod - def connect(url, crypto=None): - return SoledadSyncTarget(url, crypto=crypto) + This method will create a new syncer whenever there is no syncer + available in the pool. - def __init__(self, url, creds=None, crypto=None): + :return: A syncer. + :rtype: HTTPDocumentSyncer """ - Initialize the SoledadSyncTarget. - - :param url: The url of the target replica to sync with. - :type url: str - :param creds: optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param soledad: An instance of Soledad so we can encrypt/decrypt - document contents when syncing. - :type soledad: soledad.Soledad + syncer = None + # get an available syncer or create a new one + try: + syncer = self._doc_syncers.pop() + except IndexError: + syncer = HTTPDocumentSyncer( + self._raw_url, self._raw_creds, self._query_string, + self._headers, self._ensure_callback) + return syncer + + def release_syncer(self, syncer_thread, doc_syncer): """ - HTTPSyncTarget.__init__(self, url, creds) - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() + Return a syncer to the pool after use and check for any failures. - def _init_post_request(self, url, action, headers, content_length): + :param syncer: The syncer to be returned to the pool. + :type syncer: HTTPDocumentSyncer """ - Initiate a syncing POST request. + with self._pool_access_lock: + self._doc_syncers.append(doc_syncer) + if syncer_thread.success is True: + self._threads.remove(syncer_thread) + self._semaphore_pool.release() - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int + def cancel_threads(self, calling_thread): """ - self._conn.putrequest('POST', url) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id): + Stop all threads in the pool. """ - Fetch sync documents from the remote database and insert them in the - local database. + # stop sync + self._stop_method() + stopped = [] + # stop all threads + logger.warning("Soledad sync: cancelling sync threads...") + with self._pool_access_lock: + self._failures = True + while self._threads: + t = self._threads.pop(0) + t.stop() + self._doc_syncers.append(t.doc_syncer) + stopped.append(t) + # release locks and join + while stopped: + t = stopped.pop(0) + t.request_lock.acquire(False) # just in case + t.request_lock.release() + t.callback_lock.acquire(False) # just in case + t.callback_lock.release() + logger.warning("Soledad sync: cancelled sync threads.") + + def cleanup(self): + """ + Close and remove any syncers from the pool. + """ + with self._pool_access_lock: + while self._doc_syncers: + syncer = self._doc_syncers.pop() + syncer.close() + del syncer - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. +class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): + + def __init__(self, raw_url, creds, query_string, headers, ensure_callback): + """ + Initialize the client. + + :param raw_url: The raw URL of the target HTTP server. + :type raw_url: str + :param creds: Authentication credentials. + :type creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct target_replica_uid, if it was just created. :type ensure_callback: callable + """ + HTTPClientBase.__init__(self, raw_url, creds=creds) + # info needed to perform the request + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + # the actual request method + self._request_method = None + self._success_callback = None + self._failure_callback = None + + def _reset(self): + """ + Reset this document syncer so we can reuse it. + """ + self._request_method = None + self._success_callback = None + self._failure_callback = None + self._request_method = None - :raise BrokenSyncStream: If C{data} is malformed. + def set_request_method(self, method, *args, **kwargs): + """ + Set the actual method to perform the request. - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: list of str - """ - - def _post_get_doc(received): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'get', headers, size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - number_of_changes = None - received = 0 + :param method: Either 'get' or 'put'. + :type method: str + :param args: Arguments for the request method. + :type args: list + :param kwargs: Keyworded arguments for the request method. + :type kwargs: dict + """ + self._reset() + # resolve request method + if method is 'get': + self._request_method = self._get_doc + elif method is 'put': + self._request_method = self._put_doc + else: + raise Exception + # store request method args + self._args = args + self._kwargs = kwargs - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - while number_of_changes is None or received < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - return last_known_generation, last_known_trans_id - # try to fetch one document from target - data, _ = _post_get_doc(received) - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - soledad_assert('number_of_changes' in metadata) - soledad_assert('new_generation' in metadata) - soledad_assert('new_transaction_id' in metadata) - number_of_changes = metadata['number_of_changes'] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - except json.JSONDecodeError, AssertionError: - raise BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if ensure_callback and 'replica_uid' in metadata: - ensure_callback(metadata['replica_uid']) - # bail out if there are no documents to be received - if number_of_changes == 0: - break - # decrypt incoming document and insert into local database - entry = None - try: - entry = json.loads(data[1]) - except IndexError: - raise BrokenSyncStream - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - received += 1 - signal( - SOLEDAD_SYNC_RECEIVE_STATUS, - "%d/%d" % - (received, number_of_changes)) - return new_generation, new_transaction_id + def set_success_callback(self, callback): + self._success_callback = callback + + def set_failure_callback(self, callback): + self._failure_callback = callback + + @property + def success_callback(self): + return self._success_callback + + @property + def failure_callback(self): + return self._failure_callback + + def do_request(self): + """ + Actually perform the request. + + :return: The body and headers of the response. + :rtype: tuple + """ + self._ensure_connection() + args = self._args + kwargs = self._kwargs + return self._request_method(*args, **kwargs) def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -482,6 +490,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type body: str :param content-type: The content-type of the request. :type content-type: str + + :return: The body and headers of the response. + :rtype: tuple + + :raise errors.Unavailable: Raised after a number of unsuccesful + request attempts. + :raise Exception: Raised for any other exception ocurring during the + request. """ self._ensure_connection() @@ -566,14 +582,506 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type entries: list :param dic: The data to be included in this entry. :type dic: dict + + :return: The size of the prepared entry. + :rtype: int """ entry = comma + '\r\n' + json.dumps(dic) entries.append(entry) return len(entry) - def sync_exchange(self, docs_by_generations, source_replica_uid, - last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + def _init_post_request(self, action, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', self._query_string) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in self._headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() + + def _get_doc(self, received, sync_id, last_known_generation, + last_known_trans_id): + """ + Get a sync document from server by means of a POST request. + + :param received: The number of documents already received in the + current sync session. + :type received: int + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :return: The body and headers of the response. + :rtype: tuple + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('get', size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id, number_of_docs, doc_idx): + """ + Put a sync document on server by means of a POST request. + + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param id: The document id. + :type id: str + :param rev: The document revision. + :type rev: str + :param content: The serialized document content. + :type content: str + :param gen: The generation of the modification of the document. + :type gen: int + :param trans_id: The transaction id of the modification of the + document. + :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + + :return: The body and headers of the response. + :rtype: tuple + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('put', size) + # send document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) + + +class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_TASK_PERIOD = 0.5 + + # + # Modified HTTPSyncTarget methods. + # + + def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, + sync_db=None, sync_db_write_lock=None): + """ + Initialize the SoledadSyncTarget. + + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: Optional dictionary giving credentials. + to authorize the operation with the server. + :type creds: dict + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_db_write_lock: a write lock for controlling concurrent + access to the sync_db + :type sync_db_write_lock: threading.Lock + """ + HTTPSyncTarget.__init__(self, url, creds) + self._raw_url = url + self._raw_creds = creds + self._crypto = crypto + self._stopped = True + self._stop_lock = threading.Lock() + self._sync_exchange_lock = threading.Lock() + self.source_replica_uid = source_replica_uid + self._defer_decryption = False + + # deferred decryption attributes + self._sync_db = None + self._sync_db_write_lock = None + self._decryption_callback = None + self._sync_decr_pool = None + self._sync_watcher = None + if sync_db and sync_db_write_lock is not None: + self._sync_db = sync_db + self._sync_db_write_lock = sync_db_write_lock + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, self._sync_db, + self._sync_db_write_lock, + insert_doc_cb=self._insert_doc_cb) + self._sync_decr_pool.set_source_replica_uid( + self.source_replica_uid) + + def _teardown_sync_decr_pool(self): + """ + Tear down the SyncDecrypterPool. + """ + if self._sync_decr_pool is not None: + self._sync_decr_pool.close() + self._sync_decr_pool = None + + def _setup_sync_watcher(self): + """ + Set up the sync watcher for deferred decryption. + """ + if self._sync_watcher is None: + self._sync_watcher = TimerTask( + self._decrypt_syncing_received_docs, + delay=self.DECRYPT_TASK_PERIOD) + + def _teardown_sync_watcher(self): + """ + Tear down the sync watcher. + """ + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + self._sync_watcher = None + + def _get_replica_uid(self, url): + """ + Return replica uid from the url, or None. + + :param url: the replica url + :type url: str + """ + replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) + return replica_uid_match[0] if len(replica_uid_match) > 0 else None + + @staticmethod + def connect(url, source_replica_uid=None, crypto=None): + return SoledadSyncTarget( + url, source_replica_uid=source_replica_uid, crypto=crypto) + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + data, _ = response + # decode incoming stream + parts = data.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (json.JSONDecodeError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._save_encrypted_received_doc( + doc, gen, trans_id, idx, total) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._return_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._save_received_doc(doc, gen, trans_id, idx, total) + else: + self._return_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx + 1, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback, sync_id, + syncer_pool, defer_decryption=False): + """ + Fetch sync documents from the remote database and insert them in the + local database. + + If an incoming document's encryption scheme is equal to + EncryptionSchemes.SYMKEY, then this method will decrypt it with + Soledad's symmetric key. + + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict + :param return_doc_cb: A callback to insert docs from target. + :type return_doc_cb: callable + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable + :param sync_id: The id for the current sync session. + :type sync_id: str + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :raise BrokenSyncStream: If `data` is malformed. + + :return: A dictionary representing the first line of the response got + from remote replica. + :rtype: dict + """ + # we keep a reference to the callback in case we defer the decryption + self._return_doc_cb = return_doc_cb + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + idx = 0 + number_of_changes = 1 + + first_request = True + last_callback_lock = None + threads = [] + + # get incoming documents + while idx < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + break + + # launch a thread to fetch one document from target + t = syncer_pool.new_syncer_thread( + idx, number_of_changes, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + t.doc_syncer.set_request_method( + 'get', idx, sync_id, last_known_generation, + last_known_trans_id) + t.doc_syncer.set_success_callback(self._insert_received_doc) + + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while getting document " \ + "%d/%d: %s" \ + % (idx + 1, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + threads.append(t) + t.start() + last_callback_lock = t.callback_lock + idx += 1 + + # if this is the first request, wait to update the number of + # changes + if first_request is True: + t.join() + if t.success: + number_of_changes, _, _ = t.result + first_request = False + + # make sure all threads finished and we have up-to-date info + last_successful_thread = None + while threads: + # check if there are failures + t = threads.pop(0) + t.join() + if t.success: + last_successful_thread = t + + # get information about last successful thread + if last_successful_thread is not None: + body, _ = last_successful_thread.response + parsed_body = json.loads(body) + # get current target gen and trans id in case no documents were + # transferred + if len(parsed_body) == 1: + metadata = parsed_body[0] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id from last transferred + # document + else: + doc_data = parsed_body[1] + new_generation = doc_data['gen'] + new_transaction_id = doc_data['trans_id'] + + return new_generation, new_transaction_id + + def sync_exchange(self, docs_by_generations, + source_replica_uid, last_known_generation, + last_known_trans_id, return_doc_cb, + ensure_callback=None, defer_decryption=True, + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -586,24 +1094,54 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): the last local generation the remote replica knows about. :type docs_by_generations: list of tuples + :param source_replica_uid: The uid of the source replica. :type source_replica_uid: str + :param last_known_generation: Target's last known generation. :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. :type last_known_trans_id: str + :param return_doc_cb: A callback for inserting received documents from - target. + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. :type return_doc_cb: function + :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just created. + replica uid if the target replica was just + created. :type ensure_callback: function + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self._ensure_callback = ensure_callback + + if defer_decryption: + self._sync_exchange_lock.acquire() + self._setup_sync_decr_pool() + self._setup_sync_watcher() + self._defer_decryption = True + self.start() - sync_id = str(uuid4()) + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + # let the decrypter pool access the passed callback to insert docs + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + if not self.clear_to_sync(): + raise PendingReceivedDocsSyncError self._ensure_connection() if self._trace_hook: # for tests @@ -611,78 +1149,135 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) headers = self._sign_request('POST', url, {}) - def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, sync_id): - """ - Put a sync document on server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'put', headers, size) - # send document - for entry in entries: - self._conn.send(entry) - data, _ = self._response() - data = json.loads(data) - return data[0]['new_generation'], data[0]['new_transaction_id'] - cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id # send docs + msg = "%d/%d" % (0, len(docs_by_generations)) + signal(SOLEDAD_SYNC_SEND_STATUS, msg) + logger.debug("Soledad sync send status: %s" % msg) + + defer_encryption = self._sync_db is not None + syncer_pool = DocumentSyncerPool( + self._raw_url, self._raw_creds, url, headers, ensure_callback, + self.stop) + threads = [] + last_request_lock = None + last_callback_lock = None sent = 0 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (0, len(docs_by_generations))) + total = len(docs_by_generations) + + synced = [] + number_of_docs = len(docs_by_generations) + for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: break + # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue + # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): - doc_json = encrypt_doc(self._crypto, doc) + if not defer_encryption: + # fallback case, for tests + doc_json = encrypt_doc(self._crypto, doc) + else: + try: + doc_json = self.get_encrypted_doc_from_db( + doc.doc_id, doc.rev) + except Exception as exc: + logger.error("Error while getting " + "encrypted doc from db") + logger.exception(exc) + continue + if doc_json is None: + # Not marked as tombstone, but we got nothing + # from the sync db. As it is not encrypted yet, we + # force inline encryption. + # TODO: implement a queue to deal with these cases. + doc_json = encrypt_doc(self._crypto, doc) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- - cur_target_gen, cur_target_trans_id = _post_put_doc( - headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, - sync_id=sync_id) + t = syncer_pool.new_syncer_thread( + sent + 1, total, last_request_lock=None, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1) + # set the success calback + + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) + + t.doc_syncer.set_success_callback(_success_callback) + + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + threads.append((t, doc)) + last_request_lock = t.request_lock + last_callback_lock = t.callback_lock sent += 1 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (sent, len(docs_by_generations))) + + # make sure all threads finished and we have up-to-date info + while threads: + # check if there are failures + t, doc = threads.pop(0) + t.join() + if t.success: + synced.append((doc.doc_id, doc.rev)) + + if defer_decryption: + self._sync_watcher.start() # get docs from target - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id) + if self.stopped is False: + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback, sync_id, syncer_pool, + defer_decryption=defer_decryption) + syncer_pool.cleanup() + + # delete documents from the sync database + if defer_encryption: + self.delete_encrypted_docs_from_db(synced) + + # wait for deferred decryption to finish + if defer_decryption: + while self.clear_to_sync() is False: + sleep(self.DECRYPT_TASK_PERIOD) + self._teardown_sync_watcher() + self._teardown_sync_decr_pool() + self._sync_exchange_lock.release() + self.stop() return cur_target_gen, cur_target_trans_id @@ -714,3 +1309,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ with self._stop_lock: return self._stopped is True + + def get_encrypted_doc_from_db(self, doc_id, doc_rev): + """ + Retrieve encrypted document from the database of encrypted docs for + sync. + + :param doc_id: The Document id. + :type doc_id: str + + :param doc_rev: The document revision + :type doc_rev: str + """ + encr = SyncEncrypterPool + c = self._sync_db.cursor() + sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + res = c.fetchall() + if len(res) != 0: + return res[0][0] + + def delete_encrypted_docs_from_db(self, docs_ids): + """ + Delete several encrypted documents from the database of symmetrically + encrypted docs to sync. + + :param docs_ids: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs_ids: any iterable of tuples of str + """ + if docs_ids: + encr = SyncEncrypterPool + c = self._sync_db.cursor() + for doc_id, doc_rev in docs_ids: + sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + self._sync_db.commit() + + def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save a symmetrically encrypted incoming document into the received + docs table in the sync db. A decryption task will pick it up + from here in turn. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + def _save_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save any incoming document into the received docs table in the sync db. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + # + # Symmetric decryption of syncing docs + # + + def clear_to_sync(self): + """ + Return True if sync can proceed (ie, the received db table is empty). + :rtype: bool + """ + if self._sync_decr_pool is not None: + return self._sync_decr_pool.count_received_encrypted_docs() == 0 + else: + return True + + def set_decryption_callback(self, cb): + """ + Set callback to be called when the decryption finishes. + + :param cb: The callback to be set. + :type cb: callable + """ + self._decryption_callback = cb + + def has_decryption_callback(self): + """ + Return True if there is a decryption callback set. + :rtype: bool + """ + return self._decryption_callback is not None + + def has_syncdb(self): + """ + Return True if we have an initialized syncdb. + """ + return self._sync_db is not None + + def _decrypt_syncing_received_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from TimerTask self._sync_watcher. + """ + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + return + + decrypter = self._sync_decr_pool + decrypter.decrypt_received_docs() + done = decrypter.process_decrypted() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) |