diff options
author | Kali Kaneko <kali@leap.se> | 2014-10-07 18:56:31 +0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-10-07 18:56:31 +0200 |
commit | 42d9b82af327fee94a8b626e2606b0b854140ce2 (patch) | |
tree | 846eaf6fae05d1c6755f87535b35d44ab022b0b1 | |
parent | a7568bd38bdc63de5f074861862390281f49abf0 (diff) | |
parent | 0a66ad2bbebb366a72de925ab6ebf65e1bd117c0 (diff) |
Merge remote-tracking branch 'remotes/drebs-github/deb-0.6.0' into debian
46 files changed, 5233 insertions, 772 deletions
@@ -1,3 +1,26 @@ +0.6.0 Jul 18, 2014: +Client: + o Close all connections after syncing. Fixes #5518. + o Reset synchronizer state in order to reuse the same synchronizer + object multiple times. + o Use temporal database for encryption/decryption during + sync. Closes #5326. + o Add sync status signals. Closes #5517. + o Allow for interrupting and recovering sync. Closes #5517. + o Parallelize sync requests and reuse HTTP connections. + o Split sync in multiple POST requests in client. Closes #5571. + +Common: + o Use a dedicated HTTP resource for couch multipart PUTs to avoid + bigcouch. Closes #5739. + +Server: + o Pin PyOpenSSL dependency version to <0.14 to avoid yet another + crypto dependency. + o Authenticate in time-insensitive manner. Closes #3399. + o Allow for interrupting and recovering sync. Closes #5517. + o Split sync in multiple POST requests in server. Closes #5571. + 0.5.2 Jun 6, 2014: Client: o Reset synchronizer state in order to reuse the same synchronizer diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 7590aee5..b5abd4c7 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -3,12 +3,14 @@ simplejson u1db scrypt pycryptopp +cchardet +zope.proxy # # leap deps # -leap.soledad.common>=0.3.0 +leap.soledad.common>=0.6.0 # # XXX things to fix yet: @@ -20,5 +22,3 @@ oauth # pysqlite should not be a dep, see #2945 pysqlite - -chardet diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 0d3a21fd..586e3389 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -223,33 +223,48 @@ class Soledad(object): """ def __init__(self, uuid, passphrase, secrets_path, local_db_path, - server_url, cert_file, auth_token=None, secret_id=None): + server_url, cert_file, + auth_token=None, secret_id=None, defer_encryption=False): """ Initialize configuration, cryptographic keys and dbs. :param uuid: User's uuid. :type uuid: str + :param passphrase: The passphrase for locking and unlocking encryption secrets for local and remote storage. :type passphrase: unicode + :param secrets_path: Path for storing encrypted key used for symmetric encryption. :type secrets_path: str + :param local_db_path: Path for local encrypted storage db. :type local_db_path: str + :param server_url: URL for Soledad server. This is used either to sync - with the user's remote db and to interact with the shared recovery - database. + with the user's remote db and to interact with the + shared recovery database. :type server_url: str + :param cert_file: Path to the certificate of the ca used to validate the SSL certificate used by the remote soledad server. :type cert_file: str + :param auth_token: Authorization token for accessing remote databases. :type auth_token: str + :param secret_id: The id of the storage secret to be used. + :type secret_id: str + + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :raise BootstrapSequenceError: Raised when the secret generation and - storage on server sequence has failed for some reason. + storage on server sequence has failed + for some reason. """ # get config params self._uuid = uuid @@ -258,8 +273,10 @@ class Soledad(object): # init crypto variables self._secrets = {} self._secret_id = secret_id - # init config (possibly with default values) + self._defer_encryption = defer_encryption + self._init_config(secrets_path, local_db_path, server_url) + self._set_token(auth_token) self._shared_db_instance = None # configure SSL certificate @@ -390,6 +407,7 @@ class Soledad(object): # release the lock on shared db try: self._shared_db.unlock(token) + self._shared_db.close() except NotLockedError: # for some reason the lock expired. Despite that, secret # loading or generation/storage must have been executed @@ -469,24 +487,20 @@ class Soledad(object): create=True, document_factory=SoledadDocument, crypto=self._crypto, - raw_key=True) + raw_key=True, + defer_encryption=self._defer_encryption) def close(self): """ Close underlying U1DB database. """ + logger.debug("Closing soledad") if hasattr(self, '_db') and isinstance( self._db, SQLCipherDatabase): + self._db.stop_sync() self._db.close() - def __del__(self): - """ - Make sure local database is closed when object is destroyed. - """ - # Watch out! We have no guarantees that this is properly called. - self.close() - # # Management of secret for symmetric encryption. # @@ -520,6 +534,9 @@ class Soledad(object): Define the id of the storage secret to be used. This method will also replace the secret in the crypto object. + + :param secret_id: The id of the storage secret to be used. + :type secret_id: str """ self._secret_id = secret_id @@ -881,7 +898,7 @@ class Soledad(object): :type json: str :param doc_id: An optional identifier specifying the document id. :type doc_id: - :return: The new cocument + :return: The new document :rtype: SoledadDocument """ return self._db.create_doc_from_json(json, doc_id=doc_id) @@ -1041,7 +1058,7 @@ class Soledad(object): if self._db: return self._db.resolve_doc(doc, conflicted_doc_revs) - def sync(self): + def sync(self, defer_decryption=True): """ Synchronize the local encrypted replica with a remote replica. @@ -1051,16 +1068,25 @@ class Soledad(object): :param url: the url of the target replica to sync with :type url: str - :return: the local generation before the synchronisation was - performed. + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: The local generation before the synchronisation was + performed. :rtype: str """ if self._db: + try: local_gen = self._db.sync( urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=False) + creds=self._creds, autocreate=False, + defer_decryption=defer_decryption) signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) return local_gen + except Exception as e: + logger.error("Soledad exception when syncing: %s" % str(e)) def stop_sync(self): """ @@ -1079,7 +1105,9 @@ class Soledad(object): :return: Whether remote replica and local replica differ. :rtype: bool """ - target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto) + target = SoledadSyncTarget( + url, self._db._get_replica_uid(), creds=self._creds, + crypto=self._crypto) info = target.get_sync_info(self._db._get_replica_uid()) # compare source generation with target's last known source generation if self._db._get_generation() != info[4]: @@ -1087,6 +1115,13 @@ class Soledad(object): return True return False + @property + def syncing(self): + """ + Property, True if the syncer is syncing. + """ + return self._db.syncing + def _set_token(self, token): """ Set the authentication token for remote database access. diff --git a/client/src/leap/soledad/client/_version.py b/client/src/leap/soledad/client/_version.py index a3227cde..cf4e6706 100644 --- a/client/src/leap/soledad/client/_version.py +++ b/client/src/leap/soledad/client/_version.py @@ -5,8 +5,8 @@ # unpacked source archive. Distribution tarballs contain a pre-generated copy # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c' def get_versions(default={}, verbose=False): diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index a6372107..7133f804 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # crypto.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,27 +14,45 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Cryptographic utilities for Soledad. """ - - import os import binascii import hmac import hashlib - +import json +import logging +import multiprocessing +import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument + + +from leap.soledad.common.crypto import ( + EncryptionSchemes, + UnknownEncryptionScheme, + MacMethods, + UnknownMacMethod, + WrongMac, + ENC_JSON_KEY, + ENC_SCHEME_KEY, + ENC_METHOD_KEY, + ENC_IV_KEY, + MAC_KEY, + MAC_METHOD_KEY, +) +logger = logging.getLogger(__name__) -from leap.soledad.common import ( - soledad_assert, - soledad_assert_type, -) + +MAC_KEY_LENGTH = 64 class EncryptionMethods(object): @@ -45,6 +63,17 @@ class EncryptionMethods(object): AES_256_CTR = 'aes-256-ctr' XSALSA20 = 'xsalsa20' +# +# Exceptions +# + + +class DocumentNotEncrypted(Exception): + """ + Raised for failures in document encryption. + """ + pass + class UnknownEncryptionMethod(Exception): """ @@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception): """ -class SoledadCrypto(object): +def encrypt_sym(data, key, method): """ - General cryptographic functionality. + Encrypt C{data} using a {password}. + + Currently, the only encryption methods supported are AES-256 in CTR + mode and XSalsa20. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + + :return: A tuple with the initial value and the encrypted data. + :rtype: (long, str) + """ + soledad_assert_type(key, str) + + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s bits (must be 256 bits long).' % + (len(key) * 8)) + iv = None + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + iv = os.urandom(16) + ciphertext = AES(key=key, iv=iv).process(data) + # XSalsa20 + elif method == EncryptionMethods.XSALSA20: + iv = os.urandom(24) + ciphertext = XSalsa20(key=key, iv=iv).process(data) + else: + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, method, **kwargs): """ + Decrypt data using symmetric secret. + + Currently, the only encryption method supported is AES-256 CTR mode. - MAC_KEY_LENGTH = 64 + :param data: The data to be decrypted. + :type data: str + :param key: The key used to decrypt C{data} (must be 256 bits long). + :type key: str + :param method: The encryption method to use. + :type method: str + :param kwargs: Other parameters specific to each encryption method. + :type kwargs: dict + + :return: The decrypted data. + :rtype: str + """ + soledad_assert_type(key, str) + # assert params + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s (must be 256 bits long).' % len(key)) + soledad_assert( + 'iv' in kwargs, + '%s needs an initial value.' % method) + # AES-256 in CTR mode + if method == EncryptionMethods.AES_256_CTR: + return AES( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + elif method == EncryptionMethods.XSALSA20: + return XSalsa20( + key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + + # raise if method is unknown + raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + + +def doc_mac_key(doc_id, secret): + """ + Generate a key for calculating a MAC for a document whose id is + C{doc_id}. + The key is derived using HMAC having sha256 as underlying hash + function. The key used for HMAC is the first MAC_KEY_LENGTH characters + of Soledad's storage secret. The HMAC message is C{doc_id}. + + :param doc_id: The id of the document. + :type doc_id: str + + :param secret: soledad secret storage + :type secret: Soledad.storage_secret + + :return: The key. + :rtype: str + + :raise NoSymmetricSecret: if no symmetric secret was supplied. + """ + if secret is None: + raise NoSymmetricSecret() + + return hmac.new( + secret[:MAC_KEY_LENGTH], + doc_id, + hashlib.sha256).digest() + + +class SoledadCrypto(object): + """ + General cryptographic functionality encapsulated in a + object that can be passed along. + """ def __init__(self, soledad): """ Initialize the crypto object. @@ -77,78 +209,14 @@ class SoledadCrypto(object): def encrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR): - """ - Encrypt C{data} using a {password}. - - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be encrypted. - :type data: str - :param key: The key used to encrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - - :return: A tuple with the initial value and the encrypted data. - :rtype: (long, str) - """ - soledad_assert_type(key, str) - - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s bits (must be 256 bits long).' % - (len(key) * 8)) - iv = None - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - iv = os.urandom(16) - ciphertext = AES(key=key, iv=iv).process(data) - # XSalsa20 - elif method == EncryptionMethods.XSALSA20: - iv = os.urandom(24) - ciphertext = XSalsa20(key=key, iv=iv).process(data) - else: - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) - return binascii.b2a_base64(iv), ciphertext + return encrypt_sym(data, key, method) def decrypt_sym(self, data, key, method=EncryptionMethods.AES_256_CTR, **kwargs): - """ - Decrypt data using symmetric secret. + return decrypt_sym(data, key, method, **kwargs) - Currently, the only encryption method supported is AES-256 CTR mode. - - :param data: The data to be decrypted. - :type data: str - :param key: The key used to decrypt C{data} (must be 256 bits long). - :type key: str - :param method: The encryption method to use. - :type method: str - :param kwargs: Other parameters specific to each encryption method. - :type kwargs: dict - - :return: The decrypted data. - :rtype: str - """ - soledad_assert_type(key, str) - # assert params - soledad_assert( - len(key) == 32, # 32 x 8 = 256 bits. - 'Wrong key size: %s (must be 256 bits long).' % len(key)) - soledad_assert( - 'iv' in kwargs, - '%s needs an initial value.' % method) - # AES-256 in CTR mode - if method == EncryptionMethods.AES_256_CTR: - return AES( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - elif method == EncryptionMethods.XSALSA20: - return XSalsa20( - key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - - # raise if method is unknown - raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + def doc_mac_key(self, doc_id, secret): + return doc_mac_key(doc_id, self.secret) def doc_passphrase(self, doc_id): """ @@ -173,41 +241,769 @@ class SoledadCrypto(object): raise NoSymmetricSecret() return hmac.new( self.secret[ - self.MAC_KEY_LENGTH: + MAC_KEY_LENGTH: self._soledad.REMOTE_STORAGE_SECRET_LENGTH], doc_id, hashlib.sha256).digest() - def doc_mac_key(self, doc_id): + # + # secret setters/getters + # + + def _get_secret(self): + return self._soledad.storage_secret + + secret = property( + _get_secret, doc='The secret used for symmetric encryption') + +# +# Crypto utilities for a SoledadDocument. +# + + +def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): + """ + Calculate a MAC for C{doc} using C{ciphertext}. + + Current MAC method used is HMAC, with the following parameters: + + * key: sha256(storage_secret, doc_id) + * msg: doc_id + doc_rev + ciphertext + * digestmod: sha256 + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: soledad secret + :type secret: Soledad.secret_storage + + :return: The calculated MAC. + :rtype: str + """ + if mac_method == MacMethods.HMAC: + return hmac.new( + doc_mac_key(doc_id, secret), + str(doc_id) + str(doc_rev) + ciphertext, + hashlib.sha256).digest() + # raise if we do not know how to handle this MAC method + raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + +def encrypt_doc(crypto, doc): + """ + Wrapper around encrypt_docstr that accepts a crypto object and the document + as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + + return encrypt_docstr( + doc.get_json(), doc.doc_id, doc.rev, key, secret) + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): + """ + Encrypt C{doc}'s content. + + Encrypt doc's contents using AES-256 CTR mode and return a valid JSON + string representing the following: + + { + ENC_JSON_KEY: '<encrypted doc JSON string>', + ENC_SCHEME_KEY: 'symkey', + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: '<the initial value used to encrypt>', + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + :param docstr: A representation of the document to be encrypted. + :type docstr: str or unicode. + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: The JSON serialization of the dict representing the encrypted + content. + :rtype: str + """ + # encrypt content using AES-256 CTR mode + iv, ciphertext = encrypt_sym( + str(docstr), # encryption/decryption routines expect str + key, method=EncryptionMethods.AES_256_CTR) + # Return a representation for the encrypted content. In the following, we + # convert binary data to hexadecimal representation so the JSON + # serialization does not complain about what it tries to serialize. + hex_ciphertext = binascii.b2a_hex(ciphertext) + return json.dumps({ + ENC_JSON_KEY: hex_ciphertext, + ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, + ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, + ENC_IV_KEY: iv, + MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. + doc_id, doc_rev, ciphertext, + MacMethods.HMAC, secret)), + MAC_METHOD_KEY: MacMethods.HMAC, + }) + + +def decrypt_doc(crypto, doc): + """ + Wrapper around decrypt_doc_dict that accepts a crypto object and the + document as arguments. + + :param crypto: a soledad crypto object. + :type crypto: SoledadCrypto + :param doc: the document. + :type doc: SoledadDocument + + :return: json string with the decrypted document + :rtype: str + """ + key = crypto.doc_passphrase(doc.doc_id) + secret = crypto.secret + return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): + """ + Decrypt C{doc}'s content. + + Return the JSON string representation of the document's decrypted content. + + The passed doc_dict argument should have the following structure: + + { + ENC_JSON_KEY: '<enc_blob>', + ENC_SCHEME_KEY: '<enc_scheme>', + ENC_METHOD_KEY: '<enc_method>', + ENC_IV_KEY: '<initial value used to encrypt>', # (optional) + MAC_KEY: '<mac>' + MAC_METHOD_KEY: 'hmac' + } + + C{enc_blob} is the encryption of the JSON serialization of the document's + content. For now Soledad just deals with documents whose C{enc_scheme} is + EncryptionSchemes.SYMKEY and C{enc_method} is + EncryptionMethods.AES_256_CTR. + + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: + :type secret: + + :return: The JSON serialization of the decrypted content. + :rtype: str + """ + soledad_assert(ENC_JSON_KEY in doc_dict) + soledad_assert(ENC_SCHEME_KEY in doc_dict) + soledad_assert(ENC_METHOD_KEY in doc_dict) + soledad_assert(MAC_KEY in doc_dict) + soledad_assert(MAC_METHOD_KEY in doc_dict) + + # verify MAC + ciphertext = binascii.a2b_hex( # content is stored as hex. + doc_dict[ENC_JSON_KEY]) + mac = mac_doc( + doc_id, doc_rev, + ciphertext, + doc_dict[MAC_METHOD_KEY], secret) + # we compare mac's hashes to avoid possible timing attacks that might + # exploit python's builtin comparison operator behaviour, which fails + # immediatelly when non-matching bytes are found. + doc_mac_hash = hashlib.sha256( + binascii.a2b_hex( # the mac is stored as hex + doc_dict[MAC_KEY])).digest() + calculated_mac_hash = hashlib.sha256(mac).digest() + + if doc_mac_hash != calculated_mac_hash: + logger.warning("Wrong MAC while decrypting doc...") + raise WrongMac('Could not authenticate document\'s contents.') + # decrypt doc's content + enc_scheme = doc_dict[ENC_SCHEME_KEY] + plainjson = None + if enc_scheme == EncryptionSchemes.SYMKEY: + enc_method = doc_dict[ENC_METHOD_KEY] + if enc_method == EncryptionMethods.AES_256_CTR: + soledad_assert(ENC_IV_KEY in doc_dict) + plainjson = decrypt_sym( + ciphertext, key, + method=enc_method, + iv=doc_dict[ENC_IV_KEY]) + else: + raise UnknownEncryptionMethod(enc_method) + else: + raise UnknownEncryptionScheme(enc_scheme) + + return plainjson + + +def is_symmetrically_encrypted(doc): + """ + Return True if the document was symmetrically encrypted. + + :param doc: The document to check. + :type doc: SoledadDocument + + :rtype: bool + """ + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: + return True + return False + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + WORKERS = 5 + + def __init__(self, crypto, sync_db, write_lock): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: a database connection handle + :type sync_db: handle + + :param write_lock: a write lock for controlling concurrent access + to the sync_db + :type write_lock: threading.Lock + """ + self._pool = multiprocessing.Pool(self.WORKERS) + self._crypto = crypto + self._sync_db = sync_db + self._sync_db_write_lock = write_lock + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + # TODO implement throttling to reduce cpu usage?? + WORKERS = 5 + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id, rev, content" + + def encrypt_doc(self, doc, workers=True): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + + try: + if workers: + res = self._pool.apply_async( + encrypt_doc_task, args, + callback=self.encrypt_doc_cb) + else: + # encrypt inline + res = encrypt_doc_task(*args) + self.encrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def encrypt_doc_cb(self, result): """ - Generate a key for calculating a MAC for a document whose id is - C{doc_id}. + Insert results of encryption routine into the local sync database. - The key is derived using HMAC having sha256 as underlying hash - function. The key used for HMAC is the first MAC_KEY_LENGTH characters - of Soledad's storage secret. The HMAC message is C{doc_id}. + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + self.insert_encrypted_local_doc(doc_id, doc_rev, content) - :param doc_id: The id of the document. + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param content: The encrypted document. + :type content: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - :return: The key. - :rtype: str + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, )) + con.execute(sql_ins, (doc_id, doc_rev, content)) - :raise NoSymmetricSecret: if no symmetric secret was supplied. + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): + """ + Return a list of documents ready to be inserted. This list is computed + by aligning the expected list with the already gotten docs, and returning + the maximum number of docs that can be processed in the expected order + before finding a gap. + + :param expected: A list of generations to be inserted. + :type expected: list + + :param got: A dictionary whose values are the docs to be inserted. + :type got: dict + """ + ordered = [got.get(i) for i in expected] + if None in ordered: + return ordered[:ordered.index(None)] + else: + return ordered + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. All the encrypted docs are collected, together with their generation + and transaction-id + 2. The docs are enqueued for decryption. When completed, they are + inserted following the generation order. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id, rev, content, gen, trans_id" + + write_encrypted_lock = threading.Lock() + + def __init__(self, *args, **kwargs): """ - if self.secret is None: - raise NoSymmetricSecret() - return hmac.new( - self.secret[:self.MAC_KEY_LENGTH], - doc_id, - hashlib.sha256).digest() + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + self.decrypted_docs = {} + self.source_replica_uid = None - # - # secret setters/getters - # + def set_source_replica_uid(self, source_replica_uid): + """ + Set the source replica uid for this decrypter pool instance. - def _get_secret(self): - return self._soledad.storage_secret + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + """ + self.source_replica_uid = source_replica_uid - secret = property( - _get_secret, doc='The secret used for symmetric encryption') + def insert_encrypted_received_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + docstr = json.dumps(content) + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + + def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): + """ + Insert a marker with the document id, revision and generation on the + sync db. This document does not have an encrypted payload, so the + content has already been inserted into the decrypted_docs dictionary + from where it can be picked following generation order. + We need to leave here the marker to be able to calculate the expected + insertion order for a synchronization batch. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param gen: the Document Generation + :type gen: int + """ + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) + + def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + """ + # XXX this need a deeper review / testing. + # I believe that what I'm doing here is prone to problems + # if the sync is interrupted (ie, client crash) in the worst possible + # moment. We would need a recover strategy in that case + # (or, insert the document in the table all the same, but with a flag + # saying if the document is sym-encrypted or not), + content = json.dumps(content) + result = doc_id, doc_rev, content, gen, trans_id + self.decrypted_docs[gen] = result + self.insert_marker_for_received_doc(doc_id, doc_rev, gen) + + def delete_encrypted_received_doc(self, doc_id, doc_rev): + """ + Delete a encrypted received doc after it was inserted into the local + db. + + :param doc_id: Document ID. + :type doc_id: str + :param doc_rev: Document revision. + :type doc_rev: str + """ + sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + con = self._sync_db + with self._sync_db_write_lock: + with con: + con.execute(sql_del, (doc_id, doc_rev)) + + def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): + """ + Symmetrically decrypt a document. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param source_replica_uid: + :type source_replica_uid: str + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + self.source_replica_uid = source_replica_uid + + # insert_doc_cb is a proxy object that gets updated with the right + # insert function only when the sync_target invokes the sync_exchange + # method. so, if we don't still have a non-empty callback, we refuse + # to proceed. + if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), + None): + logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") + return + + # XXX move to get_doc function... + c = self._sync_db.cursor() + sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + try: + c.execute(sql, (doc_id, rev)) + res = c.fetchone() + except Exception as exc: + logger.warning("Error getting docs from syncdb: %r" % (exc,)) + return + if res is None: + logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) + return + + soledad_assert(self._crypto is not None, "need a crypto object") + try: + doc_id, rev, docstr, gen, trans_id = res + except ValueError: + logger.warning("Wrong entry in sync db") + return + + if len(docstr) == 0: + # not encrypted payload + return + + try: + content = json.loads(docstr) + except TypeError: + logger.warning("Wrong type while decoding json: %s" % repr(docstr)) + return + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret + + try: + if workers: + # Ouch. This is sent to the workers asynchronously, so + # we have no way of logging errors. We'd have to inspect + # lingering results by querying successful / get() over them... + # Or move the heck out of it to twisted. + res = self._pool.apply_async( + decrypt_doc_task, args, + callback=self.decrypt_doc_cb) + else: + # decrypt inline + res = decrypt_doc_task(*args) + self.decrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def decrypt_doc_cb(self, result): + """ + Temporarily store the decryption result in a dictionary where it will + be picked by process_decrypted. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, rev, content, gen, trans_id = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) + self.decrypted_docs[gen] = result + + def get_docs_by_generation(self): + """ + Get all documents in the received table from the sync db, + ordered by generation. + + :return: list of doc_id, rev, generation + """ + c = self._sync_db.cursor() + sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( + self.TABLE_NAME,) + c.execute(sql) + return c.fetchall() + + def count_received_encrypted_docs(self): + """ + Count how many documents we have in the table for received and + encrypted docs. + + :return: The count of documents. + :rtype: int + """ + if self._sync_db is None: + logger.warning("cannot return count with null sync_db") + return + c = self._sync_db.cursor() + sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + c.execute(sql) + res = c.fetchone() + if res is not None: + return res[0] + else: + return 0 + + def decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + """ + docs_by_generation = self.get_docs_by_generation() + logger.debug("Sync decrypter pool: There are %d documents to " \ + "decrypt." % len(docs_by_generation)) + for doc_id, rev, gen in filter(None, docs_by_generation): + self.decrypt_doc(doc_id, rev, self.source_replica_uid) + + def process_decrypted(self): + """ + Process the already decrypted documents, and insert as many documents + as can be taken from the expected order without finding a gap. + + :return: Whether we have processed all the pending docs. + :rtype: bool + """ + # Acquire the lock to avoid processing while we're still + # getting data from the syncing stream, to avoid InvalidGeneration + # problems. + with self.write_encrypted_lock: + already_decrypted = self.decrypted_docs + docs = self.get_docs_by_generation() + docs = filter(lambda entry: len(entry) > 0, docs) + expected = [gen for doc_id, rev, gen in docs] + docs_to_insert = get_insertable_docs_by_gen( + expected, already_decrypted) + for doc_fields in docs_to_insert: + self.insert_decrypted_local_doc(*doc_fields) + remaining = self.count_received_encrypted_docs() + return remaining == 0 + + def insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + logger.debug("Sync decrypter pool: inserting doc in local db: " \ + "%s:%s %s" % (doc_id, doc_rev, gen)) + try: + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + insert_fun(doc, int(gen), trans_id) + except Exception as exc: + logger.error("Sync decrypter pool: error while inserting " + "decrypted doc into local db.") + logger.exception(exc) + + else: + # If no errors found, remove it from the local temporary dict + # and from the received database. + self.decrypted_docs.pop(gen) + self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..2df9606e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC when opening a database. So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import logging +import multiprocessing import os +import sqlite3 import string import threading import time @@ -57,9 +57,12 @@ from collections import defaultdict from pysqlcipher import dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None def open(path, password, create=True, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """Open a database at the given location. Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None, :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: An instance of Database. :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None, return SQLCipherDatabase.open_database( path, password, create=create, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, - cipher_page_size=cipher_page_size) + cipher_page_size=cipher_page_size, defer_encryption=defer_encryption) # @@ -147,19 +153,40 @@ class NotAnHexString(Exception): # class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): - """A U1DB implementation that uses SQLCipher as its persistence layer.""" + """ + A U1DB implementation that uses SQLCipher as its persistence layer. + """ + defer_encryption = False _index_storage_value = 'expand referenced encrypted' k_lock = threading.Lock() create_doc_lock = threading.Lock() update_indexes_lock = threading.Lock() + _sync_watcher = None + _sync_enc_pool = None + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - syncing_lock = defaultdict(threading.Lock) """ A dictionary that hold locks which avoid multiple sync attempts from the same database replica. """ + encrypting_lock = threading.Lock() + """ + Period or recurrence of the periodic encrypting task, in seconds. + """ + ENCRYPT_TASK_PERIOD = 1 + + syncing_lock = defaultdict(threading.Lock) + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ def __init__(self, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.assert_db_is_encrypted( sqlcipher_file, password, raw_key, cipher, kdf_iter, cipher_page_size) - # connect to the database + + # connect to the sqlcipher database with self.k_lock: self._db_handle = dbapi2.connect( sqlcipher_file, @@ -215,18 +243,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._ensure_schema() self._crypto = crypto + self._sync_db = None + self._sync_db_write_lock = None + self._sync_enc_pool = None + + if self.defer_encryption: + if sqlcipher_file != ":memory:": + self._sync_db_path = "%s-sync" % sqlcipher_file + else: + self._sync_db_path = ":memory:" + + # initialize sync db + self._init_sync_db() + + # initialize syncing queue encryption pool + self._sync_enc_pool = SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + self._sync_watcher = TimerTask(self._encrypt_syncing_docs, + self.ENCRYPT_TASK_PERIOD) + self._sync_watcher.start() + def factory(doc_id=None, rev=None, json='{}', has_conflicts=False, syncable=True): return SoledadDocument(doc_id=doc_id, rev=rev, json=json, has_conflicts=has_conflicts, syncable=syncable) self.set_document_factory(factory) + # we store syncers in a dictionary indexed by the target URL. We also + # store a hash of the auth info in case auth info expires and we need + # to rebuild the syncer for that target. The final self._syncers + # format is the following: + # + # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} self._syncers = {} @classmethod def _open_database(cls, sqlcipher_file, password, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', - kdf_iter=4000, cipher_page_size=1024): + kdf_iter=4000, cipher_page_size=1024, + defer_encryption=False): """ Open a SQLCipher database. @@ -249,10 +304,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption if not os.path.isfile(sqlcipher_file): raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +357,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): def open_database(cls, sqlcipher_file, password, create, backend_cls=None, document_factory=None, crypto=None, raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - cipher_page_size=1024): + cipher_page_size=1024, defer_encryption=False): """ Open a SQLCipher database. :param sqlcipher_file: The path for the SQLCipher file. :type sqlcipher_file: str + :param password: The password that protects the SQLCipher db. :type password: str + :param create: Should the datbase be created if it does not already - exist? - :type: bool + exist? + :type create: bool + :param backend_cls: A class to use as backend. :type backend_cls: type + :param document_factory: A function that will be called with the same - parameters as Document.__init__. + parameters as Document.__init__. :type document_factory: callable + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. + document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto + :param raw_key: Whether C{password} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. + passphrase that should be hashed to obtain the + encyrption key. :type raw_key: bool + :param cipher: The cipher and mode to use. :type cipher: str + :param kdf_iter: The number of iterations to use. :type kdf_iter: int + :param cipher_page_size: The page size. :type cipher_page_size: int + :param defer_encryption: Whether to defer encryption/decryption of + documents, or do it inline while syncing. + :type defer_encryption: bool + :return: The database object. :rtype: SQLCipherDatabase """ + cls.defer_encryption = defer_encryption try: return cls._open_database( sqlcipher_file, password, document_factory=document_factory, crypto=crypto, raw_key=raw_key, cipher=cipher, - kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) + kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, + defer_encryption=defer_encryption) except u1db_errors.DatabaseDoesNotExist: if not create: raise @@ -347,7 +422,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) - def sync(self, url, creds=None, autocreate=True): + def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ Synchronize documents with remote replica exposed at url. @@ -362,6 +437,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :type creds: dict :param autocreate: Ask the target to create the db if non-existent. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool :return: The local generation before the synchronisation was performed. :rtype: int @@ -370,7 +449,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # the following context manager blocks until the syncing lock can be # acquired. with self.syncer(url, creds=creds) as syncer: - res = syncer.sync(autocreate=autocreate) + + # XXX could mark the critical section here... + try: + res = syncer.sync(autocreate=autocreate, + defer_decryption=defer_decryption) + + except PendingReceivedDocsSyncError: + logger.warning("Local sync db is not clear, skipping sync...") + return + return res def stop_sync(self): @@ -394,7 +482,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: syncer = self._get_syncer(url, creds=creds) yield syncer - syncer.sync_target.close() + + @property + def syncing(self): + lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()] + acquired_lock = lock.acquire(False) + if acquired_lock is False: + return True + lock.release() + return False def _get_syncer(self, url, creds=None): """ @@ -415,11 +511,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): h = sha256(json.dumps([url, creds])).hexdigest() cur_h, syncer = self._syncers.get(url, (None, None)) if syncer is None or h != cur_h: - syncer = Synchronizer( + wlock = self._sync_db_write_lock + syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, + self._replica_uid, creds=creds, - crypto=self._crypto)) + crypto=self._crypto, + sync_db=self._sync_db, + sync_db_write_lock=wlock)) self._syncers[url] = (h, syncer) # in order to reuse the same synchronizer multiple times we have to # reset its state (i.e. the number of documents received from target @@ -442,21 +542,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') - def create_doc(self, content, doc_id=None): + def _init_sync_db(self): + """ + Initialize the Symmetrically-Encrypted document to be synced database, + and the queue to communicate with subprocess workers. """ - Create a new document in the local encrypted database. + self._sync_db = sqlite3.connect(self._sync_db_path, + check_same_thread=False) - :param content: the contents of the new document - :type content: dict - :param doc_id: an optional identifier specifying the document id - :type doc_id: str + self._sync_db_write_lock = threading.Lock() + self._create_sync_db_tables() + self.sync_queue = multiprocessing.Queue() + + def _create_sync_db_tables(self): + """ + Create tables for the local sync documents db if needed. + """ + encr = SyncEncrypterPool + decr = SyncDecrypterPool + sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + encr.TABLE_NAME, encr.FIELD_NAMES)) + sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + decr.TABLE_NAME, decr.FIELD_NAMES)) + + with self._sync_db_write_lock: + with self._sync_db: + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) + + # + # Symmetric encryption of syncing docs + # + + def _encrypt_syncing_docs(self): + """ + Process the syncing queue and send the documents there + to be encrypted in the sync db. They will be read by the + SoledadSyncTarget during the sync_exchange. + + Called periodical from the TimerTask self._sync_watcher. + """ + lock = self.encrypting_lock + # optional wait flag used to avoid blocking + if not lock.acquire(False): + return + else: + queue = self.sync_queue + try: + while not queue.empty(): + doc = queue.get_nowait() + self._sync_enc_pool.encrypt_doc(doc) + + except Exception as exc: + logger.error("Error while encrypting docs to sync") + logger.exception(exc) + finally: + lock.release() + + # + # Document operations + # - :return: the new document - :rtype: SoledadDocument + def put_doc(self, doc): """ - with self.create_doc_lock: - return sqlite_backend.SQLitePartialExpandDatabase.create_doc( - self, content, doc_id=doc_id) + Overwrite the put_doc method, to enqueue the modified document for + encryption before sync. + + :param doc: The document to be put. + :type doc: u1db.Document + + :return: The new document revision. + :rtype: str + """ + doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( + self, doc) + if self.defer_encryption: + self.sync_queue.put_nowait(doc) + return doc_rev + + # indexes def _put_and_update_indexes(self, old_doc, doc): """ @@ -906,12 +1070,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): res = c.fetchall() return res[0][0] - def __del__(self): + def close(self): """ - Closes db_handle upon object destruction. + Close db_handle and close syncer. """ + logger.debug("Sqlcipher backend: closing") + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + for url in self._syncers: + _, syncer = self._syncers[url] + syncer.close() + if self._sync_enc_pool is not None: + self._sync_enc_pool.close() if self._db_handle is not None: self._db_handle.close() + @property + def replica_uid(self): + return self._get_replica_uid() + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,85 @@ """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + + * Defer the update of the known replica uid until all the decryption of + the incoming messages has been processed. + + * Be interrupted and recovered. """ + import json +import logging +import traceback +from threading import Lock from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. - Modified to allow for interrupting the synchronization process. + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. """ + syncing_lock = Lock() + def stop(self): """ Stop the current sync in progress. """ self.sync_target.stop() - def sync(self, autocreate=False): + def sync(self, autocreate=False, defer_decryption=True): """ Synchronize documents between source and target. + Differently from u1db `Synchronizer.sync` method, this one allows to + pass a `defer_decryption` flag that will postpone the last + step in the synchronization dance, namely, the setting of the last + known generation and transaction id for a given remote replica. + + This is done to allow the ongoing parallel decryption of the incoming + docs to proceed without `InvalidGeneration` conflicts. + :param autocreate: Whether the target replica should be created or not. :type autocreate: bool + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + """ + self.syncing_lock.acquire() + try: + return self._sync(autocreate=autocreate, + defer_decryption=defer_decryption) + except Exception: + # re-raising the exceptions to let syqlcipher.sync catch them + # (and re-create the syncer instance if needed) + raise + finally: + self.release_syncing_lock() + + def _sync(self, autocreate=False, defer_decryption=True): + """ + Helper function, called from the main `sync` method. + See `sync` docstring. """ sync_target = self.sync_target @@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer): target_gen, target_trans_id = (0, '') target_my_gen, target_my_trans_id = (0, '') + logger.debug( + "Soledad target sync info:\n" + " target replica uid: %s\n" + " target generation: %d\n" + " target trans id: %s\n" + " target my gen: %d\n" + " target my trans_id: %s" + % (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id)) + # make sure we'll have access to target replica uid once it exists if self.target_replica_uid is None: @@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer): # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("Soledad sync: there are %d documents to send." \ + % len(changes)) # get source last-seen database generation for the target if self.target_replica_uid is None: @@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer): target_last_known_gen, target_last_known_trans_id = \ self.source._get_replica_gen_and_trans_id( self.target_replica_uid) + logger.debug( + "Soledad source sync info:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (target_last_known_gen, target_last_known_trans_id)) # validate transaction ids if not changes and target_last_known_gen == target_gen: @@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer): # # The sync_exchange method may be interrupted, in which case it will # return a tuple of Nones. - new_gen, new_trans_id = sync_target.sync_exchange( - docs_by_generation, self.source._replica_uid, - target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback) + try: + new_gen, new_trans_id = sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback, + defer_decryption=defer_decryption) + logger.debug( + "Soledad source sync info after sync exchange:\n" + " source target gen: %d\n" + " source target trans_id: %s" + % (new_gen, new_trans_id)) + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + if defer_decryption and not sync_target.has_syncdb(): + logger.debug("Sync target has no valid sync db, " + "aborting defer_decryption") + defer_decryption = False + self.complete_sync() + except Exception as e: + logger.error("Soledad sync error: %s" % str(e)) + logger.error(traceback.format_exc()) + sync_target.stop() + finally: + sync_target.close() - # record target synced-up-to generation including applying what we sent + return my_gen + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + """ + logger.debug("Completing deferred last step in SYNC...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info self.source._set_replica_gen_and_trans_id( - self.target_replica_uid, new_gen, new_trans_id) + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + # if gapless record current reached generation with target - self._record_sync_info_with_the_target(my_gen) + self._record_sync_info_with_the_target(info["my_gen"]) - return my_gen + @property + def syncing(self): + """ + Return True if a sync is ongoing, False otherwise. + :rtype: bool + """ + # XXX FIXME we need some mechanism for timeout: should cleanup and + # release if something in the syncdb-decrypt goes wrong. we could keep + # track of the release date and cleanup unrealistic sync entries after + # some time. + locked = self.syncing_lock.locked() + return locked + + def release_syncing_lock(self): + """ + Release syncing lock if it's locked. + """ + if self.syncing_lock.locked(): + self.syncing_lock.release() + + def close(self): + """ + Close sync target pool of workers. + """ + self.release_syncing_lock() + self.sync_target.close() + + def __del__(self): + """ + Cleanup: release lock. + """ + self.release_syncing_lock() diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 968545b6..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,458 +14,466 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + + """ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ -import binascii + + import cStringIO import gzip -import hashlib -import hmac import logging +import re import urllib import threading +import urlparse -import simplejson as json +from collections import defaultdict from time import sleep from uuid import uuid4 +from contextlib import contextmanager -from u1db.remote import utils, http_errors -from u1db.errors import BrokenSyncStream +import simplejson as json +from taskthread import TimerTask from u1db import errors +from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter - +from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject from leap.soledad.common import soledad_assert -from leap.soledad.common.crypto import ( - EncryptionSchemes, - UnknownEncryptionScheme, - MacMethods, - UnknownMacMethod, - WrongMac, - ENC_JSON_KEY, - ENC_SCHEME_KEY, - ENC_METHOD_KEY, - ENC_IV_KEY, - MAC_KEY, - MAC_METHOD_KEY, -) from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import ( - EncryptionMethods, - UnknownEncryptionMethod, -) -from leap.soledad.client.events import ( - SOLEDAD_SYNC_SEND_STATUS, - SOLEDAD_SYNC_RECEIVE_STATUS, - signal, -) +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc, decrypt_doc +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal -logger = logging.getLogger(__name__) -# -# Exceptions -# +logger = logging.getLogger(__name__) -class DocumentNotEncrypted(Exception): +def _gunzip(data): """ - Raised for failures in document encryption. + Uncompress data that is gzipped. + + :param data: gzipped data + :type data: basestring """ - pass + buffer = cStringIO.StringIO() + buffer.write(data) + buffer.seek(0) + try: + data = gzip.GzipFile(mode='r', fileobj=buffer).read() + except Exception: + logger.warning("Error while decrypting gzipped data") + buffer.close() + return data -# -# Crypto utilities for a SoledadDocument. -# +class PendingReceivedDocsSyncError(Exception): + pass -def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method): +class DocumentSyncerThread(threading.Thread): """ - Calculate a MAC for C{doc} using C{ciphertext}. - - Current MAC method used is HMAC, with the following parameters: - - * key: sha256(storage_secret, doc_id) - * msg: doc_id + doc_rev + ciphertext - * digestmod: sha256 - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - :param ciphertext: The content of the document. - :type ciphertext: str - :param mac_method: The MAC method to use. - :type mac_method: str - - :return: The calculated MAC. - :rtype: str + A thread that knowns how to either send or receive a document during the + sync process. """ - if mac_method == MacMethods.HMAC: - return hmac.new( - crypto.doc_mac_key(doc_id), - str(doc_id) + str(doc_rev) + ciphertext, - hashlib.sha256).digest() - # raise if we do not know how to handle this MAC method - raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + def __init__(self, doc_syncer, release_method, failed_method, + idx, total, last_request_lock=None, last_callback_lock=None): + """ + Initialize a new syncer thread. + + :param doc_syncer: A document syncer. + :type doc_syncer: HTTPDocumentSyncer + :param release_method: A method to be called when finished running. + :type release_method: callable(DocumentSyncerThread) + :param failed_method: A method to be called when we failed. + :type failed_method: callable(DocumentSyncerThread) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + threading.Thread.__init__(self) + self._doc_syncer = doc_syncer + self._release_method = release_method + self._failed_method = failed_method + self._idx = idx + self._total = total + self._last_request_lock = last_request_lock + self._last_callback_lock = last_callback_lock + self._response = None + self._exception = None + self._result = None + self._success = False + # a lock so we can signal when we're finished + self._request_lock = threading.Lock() + self._request_lock.acquire() + self._callback_lock = threading.Lock() + self._callback_lock.acquire() + # make thread interruptable + self._stopped = None + self._stop_lock = threading.Lock() -def encrypt_doc(crypto, doc): - """ - Encrypt C{doc}'s content. - - Encrypt doc's contents using AES-256 CTR mode and return a valid JSON - string representing the following: - - { - ENC_JSON_KEY: '<encrypted doc JSON string>', - ENC_SCHEME_KEY: 'symkey', - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: '<the initial value used to encrypt>', - MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' - } - - :param crypto: A SoledadCryto instance used to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :return: The JSON serialization of the dict representing the encrypted - content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - # encrypt content using AES-256 CTR mode - iv, ciphertext = crypto.encrypt_sym( - str(doc.get_json()), # encryption/decryption routines expect str - crypto.doc_passphrase(doc.doc_id), - method=EncryptionMethods.AES_256_CTR) - # Return a representation for the encrypted content. In the following, we - # convert binary data to hexadecimal representation so the JSON - # serialization does not complain about what it tries to serialize. - hex_ciphertext = binascii.b2a_hex(ciphertext) - return json.dumps({ - ENC_JSON_KEY: hex_ciphertext, - ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, - ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, - ENC_IV_KEY: iv, - # store the mac as hex. - MAC_KEY: binascii.b2a_hex( - mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), - MAC_METHOD_KEY: MacMethods.HMAC, - }) - - -def decrypt_doc(crypto, doc): - """ - Decrypt C{doc}'s content. + def run(self): + """ + Run the HTTP request and store results. - Return the JSON string representation of the document's decrypted content. + This method will block and wait for an eventual previous operation to + finish before actually performing the request. It also traps any + exception and register any failure with the request. + """ + with self._stop_lock: + if self._stopped is None: + self._stopped = False + else: + return + + # eventually wait for the previous thread to finish + if self._last_request_lock is not None: + self._last_request_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + try: + self._response = self._doc_syncer.do_request() + self._request_lock.release() + + # run success callback + if self._doc_syncer.success_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self._stopped is True: + return + + self._result = self._doc_syncer.success_callback( + self._idx, self._total, self._response) + self._success = True + doc_syncer = self._doc_syncer + self._release_method(self, doc_syncer) + self._doc_syncer = None + # let next thread executed its callback + self._callback_lock.release() + + # trap any exception and signal failure + except Exception as e: + self._exception = e + self._success = False + # run failure callback + if self._doc_syncer.failure_callback is not None: + + # eventually wait for callback lock release + if self._last_callback_lock is not None: + self._last_callback_lock.acquire() + + # bail out in case we've been interrupted + if self.stopped is True: + return + + self._doc_syncer.failure_callback( + self._idx, self._total, self._exception) + + self._failed_method(self) + # we do not release the callback lock here because we + # failed and so we don't want other threads to succeed. - The content of the document should have the following structure: + @property + def doc_syncer(self): + return self._doc_syncer - { - ENC_JSON_KEY: '<enc_blob>', - ENC_SCHEME_KEY: '<enc_scheme>', - ENC_METHOD_KEY: '<enc_method>', - ENC_IV_KEY: '<initial value used to encrypt>', # (optional) - MAC_KEY: '<mac>' - MAC_METHOD_KEY: 'hmac' - } + @property + def response(self): + return self._response - C{enc_blob} is the encryption of the JSON serialization of the document's - content. For now Soledad just deals with documents whose C{enc_scheme} is - EncryptionSchemes.SYMKEY and C{enc_method} is - EncryptionMethods.AES_256_CTR. + @property + def exception(self): + return self._exception - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document to be decrypted. - :type doc: SoledadDocument + @property + def callback_lock(self): + return self._callback_lock - :return: The JSON serialization of the decrypted content. - :rtype: str - """ - soledad_assert(doc.is_tombstone() is False) - soledad_assert(ENC_JSON_KEY in doc.content) - soledad_assert(ENC_SCHEME_KEY in doc.content) - soledad_assert(ENC_METHOD_KEY in doc.content) - soledad_assert(MAC_KEY in doc.content) - soledad_assert(MAC_METHOD_KEY in doc.content) - # verify MAC - ciphertext = binascii.a2b_hex( # content is stored as hex. - doc.content[ENC_JSON_KEY]) - mac = mac_doc( - crypto, doc.doc_id, doc.rev, - ciphertext, - doc.content[MAC_METHOD_KEY]) - # we compare mac's hashes to avoid possible timing attacks that might - # exploit python's builtin comparison operator behaviour, which fails - # immediatelly when non-matching bytes are found. - doc_mac_hash = hashlib.sha256( - binascii.a2b_hex( # the mac is stored as hex - doc.content[MAC_KEY])).digest() - calculated_mac_hash = hashlib.sha256(mac).digest() - if doc_mac_hash != calculated_mac_hash: - raise WrongMac('Could not authenticate document\'s contents.') - # decrypt doc's content - enc_scheme = doc.content[ENC_SCHEME_KEY] - plainjson = None - if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc.content[ENC_METHOD_KEY] - if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc.content) - plainjson = crypto.decrypt_sym( - ciphertext, - crypto.doc_passphrase(doc.doc_id), - method=enc_method, - iv=doc.content[ENC_IV_KEY]) - else: - raise UnknownEncryptionMethod(enc_method) - else: - raise UnknownEncryptionScheme(enc_scheme) - return plainjson + @property + def request_lock(self): + return self._request_lock + @property + def success(self): + return self._success -def _gunzip(data): - """ - Uncompress data that is gzipped. + def stop(self): + with self._stop_lock: + self._stopped = True - :param data: gzipped data - :type data: basestring - """ - buffer = cStringIO.StringIO() - buffer.write(data) - buffer.seek(0) - try: - data = gzip.GzipFile(mode='r', fileobj=buffer).read() - except Exception: - logger.warning("Error while decrypting gzipped data") - buffer.close() - return data + @property + def stopped(self): + with self._stop_lock: + return self._stopped + @property + def result(self): + return self._result -# -# SoledadSyncTarget -# -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +class DocumentSyncerPool(object): """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. + A pool of reusable document syncers. """ - # - # Token auth methods. - # + POOL_SIZE = 10 + """ + The maximum amount of syncer threads running at the same time. + """ - def set_token_credentials(self, uuid, token): + def __init__(self, raw_url, raw_creds, query_string, headers, + ensure_callback, stop_method): """ - Store given credentials so we can sign the request later. + Initialize the document syncer pool. + + :param raw_url: The complete raw URL for the HTTP request. + :type raw_url: str + :param raw_creds: The credentials for the HTTP request. + :type raw_creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. + :type headers: dict + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable - :param uuid: The user's uuid. - :type uuid: str - :param token: The authentication token. - :type token: str """ - TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _sign_request(self, method, url_query, params): + # save syncer params + self._raw_url = raw_url + self._raw_creds = raw_creds + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + self._stop_method = stop_method + # pool attributes + self._failures = False + self._semaphore_pool = threading.BoundedSemaphore( + DocumentSyncerPool.POOL_SIZE) + self._pool_access_lock = threading.Lock() + self._doc_syncers = [] + self._threads = [] + + def new_syncer_thread(self, idx, total, last_request_lock=None, + last_callback_lock=None): """ - Return an authorization header to be included in the HTTP request. + Yield a new document syncer thread. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param last_request_lock: A lock to wait for before actually performing + the request. + :type last_request_lock: threading.Lock + :param last_callback_lock: A lock to wait for before actually running + the success callback. + :type last_callback_lock: threading.Lock + """ + t = None + # wait for available threads + self._semaphore_pool.acquire() + with self._pool_access_lock: + if self._failures is True: + return None + # get a syncer + doc_syncer = self._get_syncer() + # we rely on DocumentSyncerThread.run() to release the lock using + # self.release_syncer so we can launch a new thread. + t = DocumentSyncerThread( + doc_syncer, self.release_syncer, self.cancel_threads, + idx, total, + last_request_lock=last_request_lock, + last_callback_lock=last_callback_lock) + self._threads.append(t) + return t + + def _failed(self): + with self._pool_access_lock: + self._failures = True - :param method: The HTTP method. - :type method: str - :param url_query: The URL query string. - :type url_query: str - :param params: A list with encoded query parameters. - :type param: list + @property + def failures(self): + return self._failures - :return: The Authorization header. - :rtype: list of tuple + def _get_syncer(self): """ - return TokenBasedAuth._sign_request(self, method, url_query, params) - - # - # Modified HTTPSyncTarget methods. - # + Get a document syncer from the pool. - @staticmethod - def connect(url, crypto=None): - return SoledadSyncTarget(url, crypto=crypto) + This method will create a new syncer whenever there is no syncer + available in the pool. - def __init__(self, url, creds=None, crypto=None): + :return: A syncer. + :rtype: HTTPDocumentSyncer """ - Initialize the SoledadSyncTarget. - - :param url: The url of the target replica to sync with. - :type url: str - :param creds: optional dictionary giving credentials. - to authorize the operation with the server. - :type creds: dict - :param soledad: An instance of Soledad so we can encrypt/decrypt - document contents when syncing. - :type soledad: soledad.Soledad + syncer = None + # get an available syncer or create a new one + try: + syncer = self._doc_syncers.pop() + except IndexError: + syncer = HTTPDocumentSyncer( + self._raw_url, self._raw_creds, self._query_string, + self._headers, self._ensure_callback) + return syncer + + def release_syncer(self, syncer_thread, doc_syncer): """ - HTTPSyncTarget.__init__(self, url, creds) - self._crypto = crypto - self._stopped = True - self._stop_lock = threading.Lock() + Return a syncer to the pool after use and check for any failures. - def _init_post_request(self, url, action, headers, content_length): + :param syncer: The syncer to be returned to the pool. + :type syncer: HTTPDocumentSyncer """ - Initiate a syncing POST request. + with self._pool_access_lock: + self._doc_syncers.append(doc_syncer) + if syncer_thread.success is True: + self._threads.remove(syncer_thread) + self._semaphore_pool.release() - :param url: The syncing URL. - :type url: str - :param action: The syncing action, either 'get' or 'receive'. - :type action: str - :param headers: The initial headers to be sent on this request. - :type headers: dict - :param content_length: The content-length of the request. - :type content_length: int + def cancel_threads(self, calling_thread): """ - self._conn.putrequest('POST', url) - self._conn.putheader( - 'content-type', 'application/x-soledad-sync-%s' % action) - for header_name, header_value in headers: - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - self._conn.putheader('content-length', str(content_length)) - self._conn.endheaders() - - def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback, sync_id): + Stop all threads in the pool. """ - Fetch sync documents from the remote database and insert them in the - local database. + # stop sync + self._stop_method() + stopped = [] + # stop all threads + logger.warning("Soledad sync: cancelling sync threads...") + with self._pool_access_lock: + self._failures = True + while self._threads: + t = self._threads.pop(0) + t.stop() + self._doc_syncers.append(t.doc_syncer) + stopped.append(t) + # release locks and join + while stopped: + t = stopped.pop(0) + t.request_lock.acquire(False) # just in case + t.request_lock.release() + t.callback_lock.acquire(False) # just in case + t.callback_lock.release() + logger.warning("Soledad sync: cancelled sync threads.") + + def cleanup(self): + """ + Close and remove any syncers from the pool. + """ + with self._pool_access_lock: + while self._doc_syncers: + syncer = self._doc_syncers.pop() + syncer.close() + del syncer - If an incoming document's encryption scheme is equal to - EncryptionSchemes.SYMKEY, then this method will decrypt it with - Soledad's symmetric key. - :param url: The syncing URL. - :type url: str - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - :param headers: The headers of the HTTP request. +class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): + + def __init__(self, raw_url, creds, query_string, headers, ensure_callback): + """ + Initialize the client. + + :param raw_url: The raw URL of the target HTTP server. + :type raw_url: str + :param creds: Authentication credentials. + :type creds: dict + :param query_string: The query string for the HTTP request. + :type query_string: str + :param headers: The headers for the HTTP request. :type headers: dict - :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct target_replica_uid, if it was just created. :type ensure_callback: callable + """ + HTTPClientBase.__init__(self, raw_url, creds=creds) + # info needed to perform the request + self._query_string = query_string + self._headers = headers + self._ensure_callback = ensure_callback + # the actual request method + self._request_method = None + self._success_callback = None + self._failure_callback = None + + def _reset(self): + """ + Reset this document syncer so we can reuse it. + """ + self._request_method = None + self._success_callback = None + self._failure_callback = None + self._request_method = None - :raise BrokenSyncStream: If C{data} is malformed. + def set_request_method(self, method, *args, **kwargs): + """ + Set the actual method to perform the request. - :return: A dictionary representing the first line of the response got - from remote replica. - :rtype: list of str - """ - - def _post_get_doc(received): - """ - Get a sync document from server by means of a POST request. - - :param received: The number of documents already received in the - current sync session. - :type received: int - """ - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # inform server of how many documents have already been received - size += self._prepare( - ',', entries, received=received) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'get', headers, size) - # get document - for entry in entries: - self._conn.send(entry) - return self._response() - - number_of_changes = None - received = 0 + :param method: Either 'get' or 'put'. + :type method: str + :param args: Arguments for the request method. + :type args: list + :param kwargs: Keyworded arguments for the request method. + :type kwargs: dict + """ + self._reset() + # resolve request method + if method is 'get': + self._request_method = self._get_doc + elif method is 'put': + self._request_method = self._put_doc + else: + raise Exception + # store request method args + self._args = args + self._kwargs = kwargs - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - while number_of_changes is None or received < number_of_changes: - # bail out if sync process was interrupted - if self.stopped is True: - return last_known_generation, last_known_trans_id - # try to fetch one document from target - data, _ = _post_get_doc(received) - # decode incoming stream - parts = data.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise BrokenSyncStream - data = parts[1:-1] - # decode metadata - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - try: - metadata = json.loads(line) - soledad_assert('number_of_changes' in metadata) - soledad_assert('new_generation' in metadata) - soledad_assert('new_transaction_id' in metadata) - number_of_changes = metadata['number_of_changes'] - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - except json.JSONDecodeError, AssertionError: - raise BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if ensure_callback and 'replica_uid' in metadata: - ensure_callback(metadata['replica_uid']) - # bail out if there are no documents to be received - if number_of_changes == 0: - break - # decrypt incoming document and insert into local database - entry = None - try: - entry = json.loads(data[1]) - except IndexError: - raise BrokenSyncStream - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - received += 1 - signal( - SOLEDAD_SYNC_RECEIVE_STATUS, - "%d/%d" % - (received, number_of_changes)) - return new_generation, new_transaction_id + def set_success_callback(self, callback): + self._success_callback = callback + + def set_failure_callback(self, callback): + self._failure_callback = callback + + @property + def success_callback(self): + return self._success_callback + + @property + def failure_callback(self): + return self._failure_callback + + def do_request(self): + """ + Actually perform the request. + + :return: The body and headers of the response. + :rtype: tuple + """ + self._ensure_connection() + args = self._args + kwargs = self._kwargs + return self._request_method(*args, **kwargs) def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -482,6 +490,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type body: str :param content-type: The content-type of the request. :type content-type: str + + :return: The body and headers of the response. + :rtype: tuple + + :raise errors.Unavailable: Raised after a number of unsuccesful + request attempts. + :raise Exception: Raised for any other exception ocurring during the + request. """ self._ensure_connection() @@ -566,14 +582,506 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type entries: list :param dic: The data to be included in this entry. :type dic: dict + + :return: The size of the prepared entry. + :rtype: int """ entry = comma + '\r\n' + json.dumps(dic) entries.append(entry) return len(entry) - def sync_exchange(self, docs_by_generations, source_replica_uid, - last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None): + def _init_post_request(self, action, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', self._query_string) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in self._headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() + + def _get_doc(self, received, sync_id, last_known_generation, + last_known_trans_id): + """ + Get a sync document from server by means of a POST request. + + :param received: The number of documents already received in the + current sync session. + :type received: int + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :return: The body and headers of the response. + :rtype: tuple + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare( + ',', entries, received=received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('get', size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id, number_of_docs, doc_idx): + """ + Put a sync document on server by means of a POST request. + + :param sync_id: The id for the current sync session. + :type sync_id: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param id: The document id. + :type id: str + :param rev: The document revision. + :type rev: str + :param content: The serialized document content. + :type content: str + :param gen: The generation of the modification of the document. + :type gen: int + :param trans_id: The transaction id of the modification of the + document. + :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + + :return: The body and headers of the response. + :rtype: tuple + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request('put', size) + # send document + for entry in entries: + self._conn.send(entry) + return self._response() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) + + +class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_TASK_PERIOD = 0.5 + + # + # Modified HTTPSyncTarget methods. + # + + def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, + sync_db=None, sync_db_write_lock=None): + """ + Initialize the SoledadSyncTarget. + + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param url: The url of the target replica to sync with. + :type url: str + :param creds: Optional dictionary giving credentials. + to authorize the operation with the server. + :type creds: dict + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_db_write_lock: a write lock for controlling concurrent + access to the sync_db + :type sync_db_write_lock: threading.Lock + """ + HTTPSyncTarget.__init__(self, url, creds) + self._raw_url = url + self._raw_creds = creds + self._crypto = crypto + self._stopped = True + self._stop_lock = threading.Lock() + self._sync_exchange_lock = threading.Lock() + self.source_replica_uid = source_replica_uid + self._defer_decryption = False + + # deferred decryption attributes + self._sync_db = None + self._sync_db_write_lock = None + self._decryption_callback = None + self._sync_decr_pool = None + self._sync_watcher = None + if sync_db and sync_db_write_lock is not None: + self._sync_db = sync_db + self._sync_db_write_lock = sync_db_write_lock + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, self._sync_db, + self._sync_db_write_lock, + insert_doc_cb=self._insert_doc_cb) + self._sync_decr_pool.set_source_replica_uid( + self.source_replica_uid) + + def _teardown_sync_decr_pool(self): + """ + Tear down the SyncDecrypterPool. + """ + if self._sync_decr_pool is not None: + self._sync_decr_pool.close() + self._sync_decr_pool = None + + def _setup_sync_watcher(self): + """ + Set up the sync watcher for deferred decryption. + """ + if self._sync_watcher is None: + self._sync_watcher = TimerTask( + self._decrypt_syncing_received_docs, + delay=self.DECRYPT_TASK_PERIOD) + + def _teardown_sync_watcher(self): + """ + Tear down the sync watcher. + """ + if self._sync_watcher is not None: + self._sync_watcher.stop() + self._sync_watcher.shutdown() + self._sync_watcher = None + + def _get_replica_uid(self, url): + """ + Return replica uid from the url, or None. + + :param url: the replica url + :type url: str + """ + replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) + return replica_uid_match[0] if len(replica_uid_match) > 0 else None + + @staticmethod + def connect(url, source_replica_uid=None, crypto=None): + return SoledadSyncTarget( + url, source_replica_uid=source_replica_uid, crypto=crypto) + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + data, _ = response + # decode incoming stream + parts = data.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (json.JSONDecodeError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _insert_received_doc(self, idx, total, response): + """ + Insert a received document into the local replica. + + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + :param response: The body and headers of the response. + :type response: tuple(str, dict) + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._save_encrypted_received_doc( + doc, gen, trans_id, idx, total) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(decrypt_doc(self._crypto, doc)) + self._return_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._save_received_doc(doc, gen, trans_id, idx, total) + else: + self._return_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + msg = "%d/%d" % (idx + 1, total) + signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Soledad sync receive status: %s" % msg) + return number_of_changes, new_generation, new_transaction_id + + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback, sync_id, + syncer_pool, defer_decryption=False): + """ + Fetch sync documents from the remote database and insert them in the + local database. + + If an incoming document's encryption scheme is equal to + EncryptionSchemes.SYMKEY, then this method will decrypt it with + Soledad's symmetric key. + + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict + :param return_doc_cb: A callback to insert docs from target. + :type return_doc_cb: callable + :param ensure_callback: A callback to ensure we have the correct + target_replica_uid, if it was just created. + :type ensure_callback: callable + :param sync_id: The id for the current sync session. + :type sync_id: str + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :raise BrokenSyncStream: If `data` is malformed. + + :return: A dictionary representing the first line of the response got + from remote replica. + :rtype: dict + """ + # we keep a reference to the callback in case we defer the decryption + self._return_doc_cb = return_doc_cb + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + idx = 0 + number_of_changes = 1 + + first_request = True + last_callback_lock = None + threads = [] + + # get incoming documents + while idx < number_of_changes: + # bail out if sync process was interrupted + if self.stopped is True: + break + + # launch a thread to fetch one document from target + t = syncer_pool.new_syncer_thread( + idx, number_of_changes, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + t.doc_syncer.set_request_method( + 'get', idx, sync_id, last_known_generation, + last_known_trans_id) + t.doc_syncer.set_success_callback(self._insert_received_doc) + + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while getting document " \ + "%d/%d: %s" \ + % (idx + 1, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + threads.append(t) + t.start() + last_callback_lock = t.callback_lock + idx += 1 + + # if this is the first request, wait to update the number of + # changes + if first_request is True: + t.join() + if t.success: + number_of_changes, _, _ = t.result + first_request = False + + # make sure all threads finished and we have up-to-date info + last_successful_thread = None + while threads: + # check if there are failures + t = threads.pop(0) + t.join() + if t.success: + last_successful_thread = t + + # get information about last successful thread + if last_successful_thread is not None: + body, _ = last_successful_thread.response + parsed_body = json.loads(body) + # get current target gen and trans id in case no documents were + # transferred + if len(parsed_body) == 1: + metadata = parsed_body[0] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + # get current target gen and trans id from last transferred + # document + else: + doc_data = parsed_body[1] + new_generation = doc_data['gen'] + new_transaction_id = doc_data['trans_id'] + + return new_generation, new_transaction_id + + def sync_exchange(self, docs_by_generations, + source_replica_uid, last_known_generation, + last_known_trans_id, return_doc_cb, + ensure_callback=None, defer_decryption=True, + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -586,24 +1094,54 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): the last local generation the remote replica knows about. :type docs_by_generations: list of tuples + :param source_replica_uid: The uid of the source replica. :type source_replica_uid: str + :param last_known_generation: Target's last known generation. :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. :type last_known_trans_id: str + :param return_doc_cb: A callback for inserting received documents from - target. + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. :type return_doc_cb: function + :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just created. + replica uid if the target replica was just + created. :type ensure_callback: function + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self._ensure_callback = ensure_callback + + if defer_decryption: + self._sync_exchange_lock.acquire() + self._setup_sync_decr_pool() + self._setup_sync_watcher() + self._defer_decryption = True + self.start() - sync_id = str(uuid4()) + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + # let the decrypter pool access the passed callback to insert docs + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + if not self.clear_to_sync(): + raise PendingReceivedDocsSyncError self._ensure_connection() if self._trace_hook: # for tests @@ -611,78 +1149,135 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) headers = self._sign_request('POST', url, {}) - def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id, sync_id): - """ - Put a sync document on server by means of a POST request. - - :param received: How many documents have already been received in - this sync session. - :type received: int - """ - # prepare to send the document - entries = ['['] - size = 1 - # add remote replica metadata to the request - size += self._prepare( - '', entries, - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=ensure_callback is not None) - # add the document to the request - size += self._prepare( - ',', entries, - id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - # send headers - self._init_post_request(url, 'put', headers, size) - # send document - for entry in entries: - self._conn.send(entry) - data, _ = self._response() - data = json.loads(data) - return data[0]['new_generation'], data[0]['new_transaction_id'] - cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id # send docs + msg = "%d/%d" % (0, len(docs_by_generations)) + signal(SOLEDAD_SYNC_SEND_STATUS, msg) + logger.debug("Soledad sync send status: %s" % msg) + + defer_encryption = self._sync_db is not None + syncer_pool = DocumentSyncerPool( + self._raw_url, self._raw_creds, url, headers, ensure_callback, + self.stop) + threads = [] + last_request_lock = None + last_callback_lock = None sent = 0 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (0, len(docs_by_generations))) + total = len(docs_by_generations) + + synced = [] + number_of_docs = len(docs_by_generations) + for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: break + # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue + # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): - doc_json = encrypt_doc(self._crypto, doc) + if not defer_encryption: + # fallback case, for tests + doc_json = encrypt_doc(self._crypto, doc) + else: + try: + doc_json = self.get_encrypted_doc_from_db( + doc.doc_id, doc.rev) + except Exception as exc: + logger.error("Error while getting " + "encrypted doc from db") + logger.exception(exc) + continue + if doc_json is None: + # Not marked as tombstone, but we got nothing + # from the sync db. As it is not encrypted yet, we + # force inline encryption. + # TODO: implement a queue to deal with these cases. + doc_json = encrypt_doc(self._crypto, doc) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- - cur_target_gen, cur_target_trans_id = _post_put_doc( - headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, - sync_id=sync_id) + t = syncer_pool.new_syncer_thread( + sent + 1, total, last_request_lock=None, + last_callback_lock=last_callback_lock) + + # bail out if any thread failed + if t is None: + self.stop() + break + + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1) + # set the success calback + + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) + + t.doc_syncer.set_success_callback(_success_callback) + + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + threads.append((t, doc)) + last_request_lock = t.request_lock + last_callback_lock = t.callback_lock sent += 1 - signal( - SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (sent, len(docs_by_generations))) + + # make sure all threads finished and we have up-to-date info + while threads: + # check if there are failures + t, doc = threads.pop(0) + t.join() + if t.success: + synced.append((doc.doc_id, doc.rev)) + + if defer_decryption: + self._sync_watcher.start() # get docs from target - cur_target_gen, cur_target_trans_id = self._get_remote_docs( - url, - last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback, sync_id) + if self.stopped is False: + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback, sync_id, syncer_pool, + defer_decryption=defer_decryption) + syncer_pool.cleanup() + + # delete documents from the sync database + if defer_encryption: + self.delete_encrypted_docs_from_db(synced) + + # wait for deferred decryption to finish + if defer_decryption: + while self.clear_to_sync() is False: + sleep(self.DECRYPT_TASK_PERIOD) + self._teardown_sync_watcher() + self._teardown_sync_decr_pool() + self._sync_exchange_lock.release() + self.stop() return cur_target_gen, cur_target_trans_id @@ -714,3 +1309,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ with self._stop_lock: return self._stopped is True + + def get_encrypted_doc_from_db(self, doc_id, doc_rev): + """ + Retrieve encrypted document from the database of encrypted docs for + sync. + + :param doc_id: The Document id. + :type doc_id: str + + :param doc_rev: The document revision + :type doc_rev: str + """ + encr = SyncEncrypterPool + c = self._sync_db.cursor() + sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + res = c.fetchall() + if len(res) != 0: + return res[0][0] + + def delete_encrypted_docs_from_db(self, docs_ids): + """ + Delete several encrypted documents from the database of symmetrically + encrypted docs to sync. + + :param docs_ids: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs_ids: any iterable of tuples of str + """ + if docs_ids: + encr = SyncEncrypterPool + c = self._sync_db.cursor() + for doc_id, doc_rev in docs_ids: + sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) + c.execute(sql, (doc_id, doc_rev)) + self._sync_db.commit() + + def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save a symmetrically encrypted incoming document into the received + docs table in the sync db. A decryption task will pick it up + from here in turn. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + def _save_received_doc(self, doc, gen, trans_id, idx, total): + """ + Save any incoming document into the received docs table in the sync db. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + logger.debug( + "Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + # + # Symmetric decryption of syncing docs + # + + def clear_to_sync(self): + """ + Return True if sync can proceed (ie, the received db table is empty). + :rtype: bool + """ + if self._sync_decr_pool is not None: + return self._sync_decr_pool.count_received_encrypted_docs() == 0 + else: + return True + + def set_decryption_callback(self, cb): + """ + Set callback to be called when the decryption finishes. + + :param cb: The callback to be set. + :type cb: callable + """ + self._decryption_callback = cb + + def has_decryption_callback(self): + """ + Return True if there is a decryption callback set. + :rtype: bool + """ + return self._decryption_callback is not None + + def has_syncdb(self): + """ + Return True if we have an initialized syncdb. + """ + return self._sync_db is not None + + def _decrypt_syncing_received_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from TimerTask self._sync_watcher. + """ + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + return + + decrypter = self._sync_decr_pool + decrypter.decrypt_received_docs() + done = decrypter.process_decrypted() + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) diff --git a/client/src/taskthread/__init__.py b/client/src/taskthread/__init__.py new file mode 100755 index 00000000..a734a829 --- /dev/null +++ b/client/src/taskthread/__init__.py @@ -0,0 +1,296 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import threading + +__version__ = '1.4' + + +logger = logging.getLogger(__name__) + + +class TaskInProcessException(BaseException): + pass + + +class TaskThread(threading.Thread): + """ + A thread object that repeats a task. + + Usage example:: + + from taskthread import TaskThread + + import time + + def my_task(*args, **kwargs): + print args, kwargs + + task_thread = TaskThread(my_task) + task_thread.start() + for i in xrange(10): + task_thread.run_task() + task_thread.join_task() + task_thread.join() + + .. note:: If :py:meth:`~TaskThread.run_task` is + invoked while run_task is in progress, + :py:class:`~.TaskInProcessException` will + be raised. + + :param task: + A ``function``. This param is the task to execute when + run_task is called. + :param event: + A ``threading.Event``. This event will be set when run_task + is called. The default value is a new event, but may be + specified for testing purposes. + """ + + daemon = True + ''' + Threads marked as daemon will be terminated. + ''' + def __init__(self, task, event=threading.Event(), + *args, **kwargs): + super(TaskThread, self).__init__() + self.task = task + self.task_event = event + self.running = True + self.running_lock = threading.Lock() + self.in_task = False + self.task_complete = threading.Event() + self.args = args + self.kwargs = kwargs + + def run(self): + """ + Called by threading.Thread, this runs in the new thread. + """ + while True: + self.task_event.wait() + if not self.running: + logger.debug("TaskThread exiting") + return + logger.debug("TaskThread starting task") + with self.running_lock: + self.task_event.clear() + self.task_complete.clear() + self.task(*self.args, **self.kwargs) + with self.running_lock: + self.in_task = False + self.task_complete.set() + + def run_task(self, *args, **kwargs): + """ + Run an instance of the task. + + :param args: + The arguments to pass to the task. + + :param kwargs: + The keyword arguments to pass to the task. + """ + # Don't allow this call if the thread is currently + # in a task. + with self.running_lock: + if self.in_task: + raise TaskInProcessException() + self.in_task = True + logger.debug("Waking up the thread") + self.args = args + self.kwargs = kwargs + # Wake up the thread to do it's thing + self.task_event.set() + + def join_task(self, time_out): + """ + Wait for the currently running task to complete. + + :param time_out: + An ``int``. The amount of time to wait for the + task to finish. + """ + with self.running_lock: + if not self.in_task: + return + + success = self.task_complete.wait(time_out) + if success: + self.task_complete.clear() + return success + + def join(self, timeout=None): + """ + Wait for the task to finish + """ + self.running = False + self.task_event.set() + super(TaskThread, self).join(timeout=timeout) + + +class TimerTask(object): + """ + An object that executes a commit function at a given interval. + This class is driven by a TaskThread. A new TaskThread will be + created the first time :py:meth:`.~start` is called. All + subsequent calls to start will reuse the same thread. + + Usage example:: + + from taskthread import TimerTask + import time + + count = 0 + def get_count(): + return count + def execute(): + print "Count: %d" % count + + task = TimerTask(execute, + timeout=10, + count_fcn=get_count, + threshold=1) + + task.start() + + for i in xrange(100000): + count += 1 + time.sleep(1) + task.stop() + count = 0 + task.start() + for i in xrange(100000): + count += 1 + time.sleep(1) + task.shutdown() + + :param execute_fcn: + A `function`. This function will be executed on each time interval. + + :param delay: + An `int`. The delay in **seconds** invocations of + `execute_fcn`. Default: `10`. + + :param count_fcn: + A `function`. This function returns a current count. If the count + has not changed more the `threshold` since the last invocation of + `execute_fcn`, `execute_fcn` will not be called again. If not + specified, `execute_fcn` will be called each time the timer fires. + **Optional**. If count_fcn is specified, ``threshold`` is required. + + :param threshold: + An `int`. Specifies the minimum delta in `count_fcn` that must be + met for `execute_fcn` to be invoked. **Optional**. Must be + specified in conjunction with `count_fcn`. + + """ + def __init__(self, execute_fcn, delay=10, count_fcn=None, threshold=None): + self.running = True + self.execute_fcn = execute_fcn + self.last_count = 0 + self.event = threading.Event() + self.delay = delay + self.thread = None + self.running_lock = threading.RLock() + if bool(threshold) != bool(count_fcn): + raise ValueError("Must specify threshold " + "and count_fcn, or neither") + + self.count_fcn = count_fcn + self.threshold = threshold + + def start(self): + """ + Start the task. This starts a :py:class:`.~TaskThread`, and starts + running run_threshold_timer on the thread. + + """ + if not self.thread: + logger.debug('Starting up the taskthread') + self.thread = TaskThread(self._run_threshold_timer) + self.thread.start() + + if self.threshold: + self.last_count = 0 + + logger.debug('Running the task') + self.running = True + self.thread.run_task() + + def stop(self): + """ + Stop the task, leaving the :py:class:`.~TaskThread` running + so start can be called again. + + """ + logger.debug('Stopping the task') + wait = False + with self.running_lock: + if self.running: + wait = True + self.running = False + if wait: + self.event.set() + self.thread.join_task(2) + + def shutdown(self): + """ + Close down the task thread and stop the task if it is running. + + """ + logger.debug('Shutting down the task') + self.stop() + self.thread.join(2) + + def _exec_if_threshold_met(self): + new_count = self.count_fcn() + logger.debug('new_count: %d' % new_count) + if new_count >= self.last_count + self.threshold: + self.execute_fcn() + self.last_count = new_count + + def _exec(self): + if self.count_fcn: + self._exec_if_threshold_met() + else: + self.execute_fcn() + + def _wait(self): + self.event.wait(timeout=self.delay) + self.event.clear() + logger.debug('Task woken up') + + def _exit_loop(self): + """ + If self.running is false, it means the task should shut down. + """ + exit_loop = False + with self.running_lock: + if not self.running: + exit_loop = True + logger.debug('Task shutting down') + return exit_loop + + def _run_threshold_timer(self): + """ + Main loop of the timer task + + """ + logger.debug('In Task') + while True: + self._wait() + if self._exit_loop(): + return + self._exec() diff --git a/client/src/taskthread/tests/__init__.py b/client/src/taskthread/tests/__init__.py new file mode 100755 index 00000000..92bd912f --- /dev/null +++ b/client/src/taskthread/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/client/src/taskthread/tests/unit/__init__.py b/client/src/taskthread/tests/unit/__init__.py new file mode 100755 index 00000000..92bd912f --- /dev/null +++ b/client/src/taskthread/tests/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/client/src/taskthread/tests/unit/test_taskthread.py b/client/src/taskthread/tests/unit/test_taskthread.py new file mode 100755 index 00000000..82565922 --- /dev/null +++ b/client/src/taskthread/tests/unit/test_taskthread.py @@ -0,0 +1,315 @@ +# -*- coding: utf-8 -*- +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.:w + + +import threading +import unittest2 as unittest + +from mock import Mock, patch + +from taskthread import TaskThread, TaskInProcessException, TimerTask + +forever_event = threading.Event() + + +def forever_function(*args, **kwargs): + forever_event.wait() + forever_event.clear() + + +class TaskThreadTestCase(unittest.TestCase): + """ + Tests for :py:class:`.TaskThread`. + """ + + def test___init__(self): + """ + Test the __init__ method. It doesn't really do much. + """ + task_thread = TaskThread(forever_function) + self.assertEqual(forever_function, task_thread.task) + + def test_run_not_running(self): + """ + Verifies that thread will shut down when running is false + """ + event = Mock() + event.wait = Mock(side_effect=[True]) + event.clear = Mock(side_effect=Exception("Should never be called")) + task_thread = TaskThread(forever_function, + event=event) + task_thread.running = False + task_thread.run() + event.wait.assert_called_once_with() + + def test_run_executes_task(self): + event = Mock() + event.wait = Mock(side_effect=[True, True]) + + def stop_iteration(*args, **kwargs): + args[0].running = False + + task_thread = TaskThread(stop_iteration, + event=event) + + task_thread.args = [task_thread] + task_thread.kwargs = {'a': 2} + task_thread.in_task = True + task_thread.run() + self.assertEqual(False, task_thread.in_task) + + def test_run_task(self): + event = Mock() + task_thread = TaskThread(forever_function, + event=event) + args = [1] + kwargs = {'a': 1} + + task_thread.run_task(*args, **kwargs) + self.assertEqual(tuple(args), task_thread.args) + self.assertEqual(kwargs, task_thread.kwargs) + event.set.assert_called_once_with() + + def test_run_task_task_in_progress(self): + event = Mock() + task_thread = TaskThread(forever_function, + event=event) + task_thread.in_task = True + self.assertRaises(TaskInProcessException, task_thread.run_task) + + def test_join_task(self): + task_thread = TaskThread(forever_function) + task_thread.in_task = True + task_thread.task_complete = Mock() + task_thread.task_complete.wait = Mock(side_effect=[True]) + success = task_thread.join_task(1) + self.assertTrue(success) + + def test_join_task_not_running(self): + task_thread = TaskThread(forever_function) + task_thread.task_complete = Mock() + task_thread.wait =\ + Mock(side_effect=Exception("Should never be called")) + task_thread.join_task(1) + + def test_join(self): + task_thread = TaskThread(forever_function) + task_thread.start() + task_thread.run_task() + # Set the event so the task completes + forever_event.set() + task_thread.join_task(1) + task_thread.join(1) + + def test_execute_multiple_tasks(self): + task_thread = TaskThread(forever_function) + task_thread.start() + task_thread.run_task() + # Set the event so the task completes + forever_event.set() + task_thread.join_task(1) + forever_event.set() + task_thread.join_task(1) + task_thread.join(1) + + +def my_func(): + pass + + +class TimerTaskTestCase(unittest.TestCase): + + def test___int__(self): + + task = TimerTask(my_func, + delay=100) + self.assertEqual(my_func, task.execute_fcn) + self.assertEqual(100, task.delay) + self.assertIsNone(task.count_fcn) + self.assertIsNone(task.threshold) + + def test___int__raises(self): + self.assertRaises(ValueError, TimerTask.__init__, + TimerTask(None), + my_func(), + count_fcn=Mock()) + + self.assertRaises(ValueError, TimerTask.__init__, + TimerTask(None), + my_func(), + threshold=Mock()) + + @patch('taskthread.TaskThread') + def test_start(self, TaskThreadMock): + task = TimerTask(my_func) + thread = TaskThreadMock.return_value + + task.start() + self.assertTrue(task.running) + self.assertEqual(thread, task.thread) + thread.start.assert_called_once_with() + thread.run_task.assert_called_once_with() + + @patch('taskthread.TaskThread') + def test_start_restarts(self, TaskThreadMock): + task = TimerTask(my_func, threshold=1, count_fcn=Mock()) + thread = TaskThreadMock.return_value + task.last_count = 1 + task.thread = thread + + task.start() + self.assertEqual(0, task.last_count) + self.assertEqual(0, thread.start.called) + thread.run_task.assert_called_once_with() + + @patch('taskthread.TaskThread') + def test_stop(self, TaskThreadMock): + running_lock = Mock() + running_lock.__enter__ = Mock() + running_lock.__exit__ = Mock() + task = TimerTask(my_func) + task.thread = TaskThreadMock.return_value + task.running = True + task.event = Mock() + task.running_lock = running_lock + + task.stop() + + self.assertEqual(False, task.running) + self.assertEqual(1, task.event.set.called) + running_lock.__enter__.assert_called_once_with() + running_lock.__exit__.assert_called_once_with(None, None, None) + task.thread.join_task.assert_called_once_with(2) + + @patch('taskthread.TaskThread') + def test_stop_not_running(self, TaskThreadMock): + task = TimerTask(my_func) + task.thread = TaskThreadMock.return_value + task.running = False + task.event = Mock() + + task.stop() + + self.assertEqual(False, task.running) + self.assertEqual(0, task.event.set.called) + self.assertEqual(0, task.thread.join_task.called) + + @patch('taskthread.TaskThread') + def test_shutdown(self, TaskThreadMock): + task = TimerTask(my_func) + task.thread = TaskThreadMock.return_value + task.running = False + task.shutdown() + task.thread.join.assert_called_once_with(2) + + def test__exec_if_threshold_met(self): + self.called = False + + def exec_fcn(): + self.called = True + + def count_fcn(): + return 10 + + task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1) + task.last_count = 9 + task._exec_if_threshold_met() + self.assertTrue(self.called) + self.assertEqual(10, task.last_count) + + def test__exec_if_threshold_met_not_met(self): + + def exec_fcn(): + raise Exception("This shouldn't happen!!") + + def count_fcn(): + return 10 + + task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=10) + task.last_count = 9 + task._exec_if_threshold_met() + self.assertEqual(9, task.last_count) + + def test__exec(self): + self.called = False + + def exec_fcn(): + self.called = True + + task = TimerTask(exec_fcn) + task._exec() + self.assertTrue(self.called) + + def test__exec_threshold(self): + self.called = False + + def exec_fcn(): + self.called = True + + def count_fcn(): + return 1 + + task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1) + task._exec() + self.assertTrue(self.called) + + @patch('threading.Event') + def test__wait(self, event_mock): + task = TimerTask(my_func) + event = event_mock.return_value + + task._wait() + event.wait.assert_called_once_with(timeout=task.delay) + self.assertEqual(1, event.clear.called) + + @patch('threading.RLock') + def test__exit_loop(self, mock_rlock): + task = TimerTask(my_func) + task.running = False + lock = mock_rlock.return_value + lock.__enter__ = Mock() + lock.__exit__ = Mock() + self.assertTrue(task._exit_loop()) + self.assertEqual(1, lock.__enter__.called) + lock.__exit__.assert_called_once_with(None, None, None) + + @patch('threading.RLock') + def test__exit_loop_running(self, mock_rlock): + lock = mock_rlock.return_value + lock.__enter__ = Mock() + lock.__exit__ = Mock() + task = TimerTask(my_func) + task.running = True + self.assertFalse(task._exit_loop()) + self.assertEqual(1, lock.__enter__.called) + lock.__exit__.assert_called_once_with(None, None, None) + + @patch('threading.RLock') + @patch('threading.Event') + def test__run_threshold_timer(self, event_mock, rlock_mock): + self.task = None + event = event_mock.return_value + lock = rlock_mock.return_value + lock.__enter__ = Mock() + lock.__exit__ = Mock() + + def exec_fcn(): + self.task.running = False + + self.task = TimerTask(exec_fcn) + self.task._run_threshold_timer() + + self.assertFalse(self.task.running) + self.assertEqual(2, event.wait.call_count) diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index a3227cde..cf4e6706 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -5,8 +5,8 @@ # unpacked source archive. Distribution tarballs contain a pre-generated copy # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c' def get_versions(default={}, verbose=False): diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..5658f4ce 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,9 @@ class CouchDatabase(CommonBackend): ) def _set_replica_gen_and_trans_id(self, other_replica_uid, - other_generation, other_transaction_id): + other_generation, other_transaction_id, + number_of_docs=None, doc_idx=None, + sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1122,12 +1124,21 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ self._do_set_replica_gen_and_trans_id( - other_replica_uid, other_generation, other_transaction_id) + other_replica_uid, other_generation, other_transaction_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) def _do_set_replica_gen_and_trans_id( - self, other_replica_uid, other_generation, other_transaction_id): + self, other_replica_uid, other_generation, other_transaction_id, + number_of_docs=None, doc_idx=None, sync_id=None): """ Set the last-known generation and transaction id for the other database replica. @@ -1143,6 +1154,13 @@ class CouchDatabase(CommonBackend): :param other_transaction_id: The transaction id associated with the generation. :type other_transaction_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str :raise MissingDesignDocError: Raised when tried to access a missing design document. @@ -1163,12 +1181,19 @@ class CouchDatabase(CommonBackend): res = self._database.resource(*ddoc_path) try: with CouchDatabase.update_handler_lock[self._get_replica_uid()]: + body={ + 'other_replica_uid': other_replica_uid, + 'other_generation': other_generation, + 'other_transaction_id': other_transaction_id, + } + if number_of_docs is not None: + body['number_of_docs'] = number_of_docs + if doc_idx is not None: + body['doc_idx'] = doc_idx + if sync_id is not None: + body['sync_id'] = sync_id res.put_json( - body={ - 'other_replica_uid': other_replica_uid, - 'other_generation': other_generation, - 'other_transaction_id': other_transaction_id, - }, + body=body, headers={'content-type': 'application/json'}) except ResourceNotFound as e: raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1331,8 @@ class CouchDatabase(CommonBackend): doc.set_conflicts(cur_doc.get_conflicts()) def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, - replica_trans_id=''): + replica_trans_id='', number_of_docs=None, + doc_idx=None, sync_id=None): """ Insert/update document into the database with a given revision. @@ -1339,6 +1365,13 @@ class CouchDatabase(CommonBackend): :param replica_trans_id: The transaction_id associated with the generation. :type replica_trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document being sent. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str :return: (state, at_gen) - If we don't have doc_id already, or if doc_rev supersedes the existing document revision, then the @@ -1398,7 +1431,9 @@ class CouchDatabase(CommonBackend): self._force_doc_sync_conflict(doc) if replica_uid is not None and replica_gen is not None: self._set_replica_gen_and_trans_id( - replica_uid, replica_gen, replica_trans_id) + replica_uid, replica_gen, replica_trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, + sync_id=sync_id) # update info old_doc.rev = doc.rev if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..b0ae2de6 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,151 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + * { + * 'syncs': [ + * ['<replica_uid>', <gen>, '<trans_id>'], + * ... + * ], + * 'pending': { + * 'other_replica_uid': { + * 'sync_id': '<sync_id>', + * 'log': [[<gen>, '<trans_id>'], ...] + * }, + * ... + * } + * } + * + * The update function below does the following: + * + * 0. If we do not receive a sync_id, we just update the 'syncs' list with + * the incoming info about the source replica state. + * + * 1. Otherwise, if the incoming sync_id differs from current stored + * sync_id, then we assume that the previous sync session for that source + * replica was interrupted and discard all pending data. + * + * 2. Then we append incoming info as pending data for that source replica + * and current sync_id, and sort the pending data by generation. + * + * 3. Then we go through pending data and find the most recent generation + * that we can use to update the actual sync log. + * + * 4. Finally, we insert the most up to date information into the sync log. + */ function(doc, req){ + + // create the document if it doesn't exist if (!doc) { doc = {} doc['_id'] = 'u1db_sync_log'; doc['syncs'] = []; } - body = JSON.parse(req.body); + + // get and validate incoming info + var body = JSON.parse(req.body); + var other_replica_uid = body['other_replica_uid']; + var other_generation = parseInt(body['other_generation']); + var other_transaction_id = body['other_transaction_id'] + var sync_id = body['sync_id']; + var number_of_docs = body['number_of_docs']; + var doc_idx = body['doc_idx']; + + // parse integers + if (number_of_docs != null) + number_of_docs = parseInt(number_of_docs); + if (doc_idx != null) + doc_idx = parseInt(doc_idx); + + if (other_replica_uid == null + || other_generation == null + || other_transaction_id == null) + return [null, 'invalid data']; + + // create slot for pending logs + if (doc['pending'] == null) + doc['pending'] = {}; + + // these are the values that will be actually inserted + var current_gen = other_generation; + var current_trans_id = other_transaction_id; + + /*------------- Wait for sequential values before storing -------------*/ + + // we just try to obtain pending log if we received a sync_id + if (sync_id != null) { + + // create slot for current source and sync_id pending log + if (doc['pending'][other_replica_uid] == null + || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { + doc['pending'][other_replica_uid] = { + 'sync_id': sync_id, + 'log': [], + 'last_doc_idx': 0, + } + } + + // append incoming data to pending log + doc['pending'][other_replica_uid]['log'].push([ + other_generation, + other_transaction_id, + doc_idx, + ]) + + // sort pending log according to generation + doc['pending'][other_replica_uid]['log'].sort(function(a, b) { + return a[0] - b[0]; + }); + + // get most up-to-date information from pending log + var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; + var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + + current_gen = null; + current_trans_id = null; + + while (last_doc_idx + 1 == pending_idx) { + pending = doc['pending'][other_replica_uid]['log'].shift() + current_gen = pending[0]; + current_trans_id = pending[1]; + last_doc_idx = pending[2] + if (doc['pending'][other_replica_uid]['log'].length == 0) + break; + pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + } + + // leave the sync log untouched if we still did not receive enough docs + if (current_gen == null) + return [doc, 'ok']; + + // update last index of received doc + doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; + + // eventually remove all pending data from that replica + if (last_doc_idx == number_of_docs) + delete doc['pending'][other_replica_uid] + } + + /*--------------- Store source replica info on sync log ---------------*/ + // remove outdated info doc['syncs'] = doc['syncs'].filter( function (entry) { - return entry[0] != body['other_replica_uid']; + return entry[0] != other_replica_uid; } ); - // store u1db rev + + // store in log doc['syncs'].push([ - body['other_replica_uid'], - body['other_generation'], - body['other_transaction_id'] + other_replica_uid, + current_gen, + current_trans_id ]); + return [doc, 'ok']; } diff --git a/common/src/leap/soledad/common/tests/__init__.py b/common/src/leap/soledad/common/tests/__init__.py index a38bdaed..3081683b 100644 --- a/common/src/leap/soledad/common/tests/__init__.py +++ b/common/src/leap/soledad/common/tests/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # __init__.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,7 +19,6 @@ """ Tests to make sure Soledad provides U1DB functionality and more. """ - import os import random import string @@ -29,11 +28,8 @@ from mock import Mock from leap.soledad.common.document import SoledadDocument from leap.soledad.client import Soledad -from leap.soledad.client.crypto import SoledadCrypto -from leap.soledad.client.target import ( - decrypt_doc, - ENC_SCHEME_KEY, -) +from leap.soledad.client.crypto import decrypt_doc_dict +from leap.soledad.client.crypto import ENC_SCHEME_KEY from leap.common.testing.basetest import BaseLeapTest @@ -49,6 +45,7 @@ class BaseSoledadTest(BaseLeapTest): """ Instantiates Soledad for usage in tests. """ + defer_sync_encryption = False def setUp(self): # config info @@ -73,11 +70,26 @@ class BaseSoledadTest(BaseLeapTest): self._db1.close() self._db2.close() self._soledad.close() + # XXX should not access "private" attrs for f in [self._soledad._local_db_path, self._soledad._secrets_path]: if os.path.isfile(f): os.unlink(f) + def get_default_shared_mock(self, put_doc_side_effect): + """ + Get a default class for mocking the shared DB + """ + class defaultMockSharedDB(object): + get_doc = Mock(return_value=None) + put_doc = Mock(side_effect=put_doc_side_effect) + lock = Mock(return_value=('atoken', 300)) + unlock = Mock(return_value=True) + + def __call__(self): + return self + return defaultMockSharedDB + def _soledad_instance(self, user=ADDRESS, passphrase=u'123', prefix='', secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, @@ -88,18 +100,11 @@ class BaseSoledadTest(BaseLeapTest): def _put_doc_side_effect(doc): self._doc_put = doc - class MockSharedDB(object): - - get_doc = Mock(return_value=None) - put_doc = Mock(side_effect=_put_doc_side_effect) - lock = Mock(return_value=('atoken', 300)) - unlock = Mock(return_value=True) - - def __call__(self): - return self - if shared_db_class is not None: MockSharedDB = shared_db_class + else: + MockSharedDB = self.get_default_shared_mock( + _put_doc_side_effect) Soledad._shared_db = MockSharedDB() return Soledad( @@ -111,7 +116,8 @@ class BaseSoledadTest(BaseLeapTest): self.tempdir, prefix, local_db_path), server_url=server_url, # Soledad will fail if not given an url. cert_file=cert_file, - secret_id=secret_id) + secret_id=secret_id, + defer_encryption=self.defer_sync_encryption) def assertGetEncryptedDoc( self, db, doc_id, doc_rev, content, has_conflicts): @@ -121,8 +127,15 @@ class BaseSoledadTest(BaseLeapTest): exp_doc = self.make_document(doc_id, doc_rev, content, has_conflicts=has_conflicts) doc = db.get_doc(doc_id) + if ENC_SCHEME_KEY in doc.content: - doc.set_json(decrypt_doc(self._soledad._crypto, doc)) + # XXX check for SYM_KEY too + key = self._soledad._crypto.doc_passphrase(doc.doc_id) + secret = self._soledad._crypto.secret + decrypted = decrypt_doc_dict( + doc.content, doc.doc_id, doc.rev, + key, secret) + doc.set_json(decrypted) self.assertEqual(exp_doc.doc_id, doc.doc_id) self.assertEqual(exp_doc.rev, doc.rev) self.assertEqual(exp_doc.has_conflicts, doc.has_conflicts) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 3b1e5a06..10d6c136 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -91,14 +91,19 @@ class CouchDBWrapper(object): logPath = os.path.join(self.tempdir, 'log', 'couch.log') while not os.path.exists(logPath): if self.process.poll() is not None: + got_stdout, got_stderr = "", "" + if self.process.stdout is not None: + got_stdout = self.process.stdout.read() + + if self.process.stderr is not None: + got_stderr = self.process.stderr.read() raise Exception(""" couchdb exited with code %d. stdout: %s stderr: %s""" % ( - self.process.returncode, self.process.stdout.read(), - self.process.stderr.read())) + self.process.returncode, got_stdout, got_stderr)) time.sleep(0.01) while os.stat(logPath).st_size == 0: time.sleep(0.01) diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index b03f79e7..6465eb80 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# test_soledad.py -# Copyright (C) 2013 LEAP +# test_couch_operations_atomicity.py +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,11 +14,9 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ +Test atomocity for couch operations. """ - import os import mock import tempfile @@ -32,7 +30,7 @@ from leap.soledad.client import Soledad from leap.soledad.common.couch import CouchDatabase, CouchServerState from leap.soledad.common.tests.test_couch import CouchDBTestCase from leap.soledad.common.tests.u1db_tests import TestCaseWithServer -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_sync_target import ( make_token_soledad_app, make_leap_document_for_test, token_leap_sync_target, @@ -224,9 +222,9 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): # # Concurrency tests # - + class _WorkerThread(threading.Thread): - + def __init__(self, params, run_method): threading.Thread.__init__(self) self._params = params @@ -260,7 +258,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): for thread in threads: thread.join() - + # assert length of transaction_log transaction_log = self.db._get_transaction_log() self.assertEqual( @@ -341,7 +339,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): # wait for threads to finish for thread in threads: thread.join() - + # do the sync! sol.sync() diff --git a/common/src/leap/soledad/common/tests/test_crypto.py b/common/src/leap/soledad/common/tests/test_crypto.py index 4b2470ba..1071af14 100644 --- a/common/src/leap/soledad/common/tests/test_crypto.py +++ b/common/src/leap/soledad/common/tests/test_crypto.py @@ -14,37 +14,17 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Tests for cryptographic related stuff. """ - import os -import shutil -import tempfile -import simplejson as json import hashlib import binascii - -from leap.common.testing.basetest import BaseLeapTest -from leap.soledad.client import ( - Soledad, - crypto, - target, -) +from leap.soledad.client import crypto from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.tests import ( - BaseSoledadTest, - KEY_FINGERPRINT, - PRIVATE_KEY, -) +from leap.soledad.common.tests import BaseSoledadTest from leap.soledad.common.crypto import WrongMac, UnknownMacMethod -from leap.soledad.common.tests.u1db_tests import ( - simple_doc, - nested_doc, -) class EncryptedSyncTestCase(BaseSoledadTest): @@ -59,16 +39,17 @@ class EncryptedSyncTestCase(BaseSoledadTest): simpledoc = {'key': 'val'} doc1 = SoledadDocument(doc_id='id') doc1.content = simpledoc + # encrypt doc - doc1.set_json(target.encrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(crypto.encrypt_doc(self._soledad._crypto, doc1)) # assert content is different and includes keys self.assertNotEqual( simpledoc, doc1.content, 'incorrect document encryption') - self.assertTrue(target.ENC_JSON_KEY in doc1.content) - self.assertTrue(target.ENC_SCHEME_KEY in doc1.content) + self.assertTrue(crypto.ENC_JSON_KEY in doc1.content) + self.assertTrue(crypto.ENC_SCHEME_KEY in doc1.content) # decrypt doc - doc1.set_json(target.decrypt_doc(self._soledad._crypto, doc1)) + doc1.set_json(crypto.decrypt_doc(self._soledad._crypto, doc1)) self.assertEqual( simpledoc, doc1.content, 'incorrect document encryption') @@ -159,15 +140,15 @@ class MacAuthTestCase(BaseSoledadTest): doc = SoledadDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(target.encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(target.MAC_KEY in doc.content) - self.assertTrue(target.MAC_METHOD_KEY in doc.content) + doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(crypto.MAC_KEY in doc.content) + self.assertTrue(crypto.MAC_METHOD_KEY in doc.content) # mess with MAC - doc.content[target.MAC_KEY] = '1234567890ABCDEF' + doc.content[crypto.MAC_KEY] = '1234567890ABCDEF' # try to decrypt doc self.assertRaises( WrongMac, - target.decrypt_doc, self._soledad._crypto, doc) + crypto.decrypt_doc, self._soledad._crypto, doc) def test_decrypt_with_unknown_mac_method_raises(self): """ @@ -177,15 +158,15 @@ class MacAuthTestCase(BaseSoledadTest): doc = SoledadDocument(doc_id='id') doc.content = simpledoc # encrypt doc - doc.set_json(target.encrypt_doc(self._soledad._crypto, doc)) - self.assertTrue(target.MAC_KEY in doc.content) - self.assertTrue(target.MAC_METHOD_KEY in doc.content) + doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc)) + self.assertTrue(crypto.MAC_KEY in doc.content) + self.assertTrue(crypto.MAC_METHOD_KEY in doc.content) # mess with MAC method - doc.content[target.MAC_METHOD_KEY] = 'mymac' + doc.content[crypto.MAC_METHOD_KEY] = 'mymac' # try to decrypt doc self.assertRaises( UnknownMacMethod, - target.decrypt_doc, self._soledad._crypto, doc) + crypto.decrypt_doc, self._soledad._crypto, doc) class SoledadCryptoAESTestCase(BaseSoledadTest): diff --git a/common/src/leap/soledad/common/tests/test_http.py b/common/src/leap/soledad/common/tests/test_http.py new file mode 100644 index 00000000..d21470e0 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_http.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: test http database +""" +from u1db.remote import http_database + +from leap.soledad.client import auth + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_http_database + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_database`. +#----------------------------------------------------------------------------- + +class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): + """ + Wraps our token auth implementation. + """ + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + +class TestHTTPDatabaseWithCreds( + test_http_database.TestHTTPDatabaseCtrWithCreds): + + def test_get_sync_target_inherits_token_credentials(self): + # this test was from TestDatabaseSimpleOperations but we put it here + # for convenience. + self.db = _HTTPDatabase('dbase') + self.db.set_token_credentials('user-uuid', 'auth-token') + st = self.db.get_sync_target() + self.assertEqual(self.db._creds, st._creds) + + def test_ctr_with_creds(self): + db1 = _HTTPDatabase('http://dbs/db', creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + self.assertIn('token', db1._creds) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_http_client.py b/common/src/leap/soledad/common/tests/test_http_client.py new file mode 100644 index 00000000..3169398b --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_http_client.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# test_http_client.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync target +""" +import json + +from u1db.remote import http_client + +from leap.soledad.client import auth +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_http_client +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_client`. +#----------------------------------------------------------------------------- + +class TestSoledadClientBase(test_http_client.TestHTTPClientBase): + """ + This class should be used to test Token auth. + """ + + def getClientWithToken(self, **kwds): + self.startServer() + + class _HTTPClientWithToken( + http_client.HTTPClientBase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + return _HTTPClientWithToken(self.getURL('dbase'), **kwds) + + def test_oauth(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def test_oauth_ctr_creds(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def test_oauth_Unauthorized(self): + """ + Suppress oauth test (we test for token auth here). + """ + pass + + def app(self, environ, start_response): + res = test_http_client.TestHTTPClientBase.app( + self, environ, start_response) + if res is not None: + return res + # mime solead application here. + if '/token' in environ['PATH_INFO']: + auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY) + if not auth: + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + scheme, encoded = auth.split(None, 1) + if scheme.lower() != 'token': + start_response("401 Unauthorized", + [('Content-Type', 'application/json')]) + return [json.dumps({"error": "unauthorized", + "message": e.message})] + uuid, token = encoded.decode('base64').split(':', 1) + if uuid != 'user-uuid' and token != 'auth-token': + return unauth_err("Incorrect address or token.") + start_response("200 OK", [('Content-Type', 'application/json')]) + return [json.dumps([environ['PATH_INFO'], uuid, token])] + + def test_token(self): + """ + Test if token is sent correctly. + """ + cli = self.getClientWithToken() + cli.set_token_credentials('user-uuid', 'auth-token') + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + + def test_token_ctr_creds(self): + cli = self.getClientWithToken(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + res, headers = cli._request('GET', ['doc', 'token']) + self.assertEqual( + ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_https.py b/common/src/leap/soledad/common/tests/test_https.py new file mode 100644 index 00000000..b6288188 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_https.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: https +""" +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests import test_sync_target as test_st +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_backends +from leap.soledad.common.tests.u1db_tests import test_https + +from leap.soledad import client +from leap.soledad.server import SoledadApp + +from u1db.remote import http_client + + +def make_soledad_app(state): + return SoledadApp(state) + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': test_st.make_leap_document_for_test, + 'make_app_with_state': test_st.make_soledad_app}), +] + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_https`. +#----------------------------------------------------------------------------- + +def token_leap_https_sync_target(test, host, path): + _, port = test.server.server_address + st = client.target.SoledadSyncTarget( + 'https://%s:%d/%s' % (host, port, path), + crypto=test._soledad._crypto) + st.set_token_credentials('user-uuid', 'auth-token') + return st + + +class TestSoledadSyncTargetHttpsSupport( + test_https.TestHttpSyncTargetHttpsSupport, + BaseSoledadTest): + + scenarios = [ + ('token_soledad_https', + {'server_def': test_https.https_server_def, + 'make_app_with_state': test_st.make_token_soledad_app, + 'make_document_for_test': test_st.make_leap_document_for_test, + 'sync_target': token_leap_https_sync_target}), + ] + + def setUp(self): + # the parent constructor undoes our SSL monkey patch to ensure tests + # run smoothly with standard u1db. + test_https.TestHttpSyncTargetHttpsSupport.setUp(self) + # so here monkey patch again to test our functionality. + http_client._VerifiedHTTPSConnection = client.VerifiedHTTPSConnection + client.SOLEDAD_CERT = http_client.CA_CERTS + + def test_working(self): + """ + Test that SSL connections work well. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + db = self.request_state._create_database('test') + self.patch(client, 'SOLEDAD_CERT', self.cacert_pem) + remote_target = self.getSyncTarget('localhost', 'test') + remote_target.record_sync_info('other-id', 2, 'T-id') + self.assertEqual( + (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id')) + + def test_host_mismatch(self): + """ + Test that SSL connections to a hostname different than the one in the + certificate raise CertificateError. + + This test was adapted to patch Soledad's HTTPS connection custom class + with the intended CA certificates. + """ + self.startServer() + self.request_state._create_database('test') + self.patch(client, 'SOLEDAD_CERT', self.cacert_pem) + remote_target = self.getSyncTarget('127.0.0.1', 'test') + self.assertRaises( + http_client.CertificateError, remote_target.record_sync_info, + 'other-id', 2, 'T-id') + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 1c5a7407..cb5348b4 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -14,12 +14,9 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Tests for server-related functionality. """ - import os import tempfile import simplejson as json @@ -39,16 +36,13 @@ from leap.soledad.common.tests.u1db_tests import ( simple_doc, ) from leap.soledad.common.tests.test_couch import CouchDBTestCase -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_target_soledad import ( make_token_soledad_app, make_leap_document_for_test, - token_leap_sync_target, ) -from leap.soledad.client import ( - Soledad, - target, -) -from leap.soledad.server import SoledadApp, LockResource +from leap.soledad.common.tests.test_sync_target import token_leap_sync_target +from leap.soledad.client import Soledad, crypto +from leap.soledad.server import LockResource from leap.soledad.server.auth import URLToAuthorization @@ -369,12 +363,12 @@ class EncryptedSyncTestCase( self.assertEqual(doc1.doc_id, couchdoc.doc_id) self.assertEqual(doc1.rev, couchdoc.rev) self.assertEqual(6, len(couchdoc.content)) - self.assertTrue(target.ENC_JSON_KEY in couchdoc.content) - self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content) - self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content) - self.assertTrue(target.ENC_IV_KEY in couchdoc.content) - self.assertTrue(target.MAC_KEY in couchdoc.content) - self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content) + self.assertTrue(crypto.MAC_KEY in couchdoc.content) + self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content) # instantiate soledad with empty db, but with same secrets path sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') _, doclist = sol2.get_all_docs() @@ -427,12 +421,12 @@ class EncryptedSyncTestCase( self.assertEqual(doc1.doc_id, couchdoc.doc_id) self.assertEqual(doc1.rev, couchdoc.rev) self.assertEqual(6, len(couchdoc.content)) - self.assertTrue(target.ENC_JSON_KEY in couchdoc.content) - self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content) - self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content) - self.assertTrue(target.ENC_IV_KEY in couchdoc.content) - self.assertTrue(target.MAC_KEY in couchdoc.content) - self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content) + self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content) + self.assertTrue(crypto.MAC_KEY in couchdoc.content) + self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content) # instantiate soledad with empty db, but with same secrets path sol2 = self._soledad_instance( prefix='x', @@ -502,7 +496,6 @@ class EncryptedSyncTestCase( sol1.close() sol2.close() - def test_sync_many_small_files(self): """ Test if Soledad can sync many smallfiles. @@ -548,6 +541,7 @@ class EncryptedSyncTestCase( sol1.close() sol2.close() + class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): """ diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py index 5a3bf2b0..11e43423 100644 --- a/common/src/leap/soledad/common/tests/test_soledad.py +++ b/common/src/leap/soledad/common/tests/test_soledad.py @@ -14,18 +14,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Tests for general Soledad functionality. """ - - import os from mock import Mock -from pysqlcipher.dbapi2 import DatabaseError from leap.common.events import events_pb2 as proto from leap.soledad.common.tests import ( BaseSoledadTest, diff --git a/common/src/leap/soledad/common/tests/test_soledad_doc.py b/common/src/leap/soledad/common/tests/test_soledad_doc.py new file mode 100644 index 00000000..0952de6d --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_soledad_doc.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: soledad docs +""" +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.u1db_tests import test_document +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import test_sync_target as st + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +#----------------------------------------------------------------------------- + + +class TestSoledadDocument(test_document.TestDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': st.make_leap_document_for_test})]) + + +class TestSoledadPyDocument(test_document.TestPyDocument, BaseSoledadTest): + + scenarios = ([( + 'leap', { + 'make_document_for_test': st.make_leap_document_for_test})]) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher.py b/common/src/leap/soledad/common/tests/test_sqlcipher.py index 891aca0f..595966ec 100644 --- a/common/src/leap/soledad/common/tests/test_sqlcipher.py +++ b/common/src/leap/soledad/common/tests/test_sqlcipher.py @@ -14,16 +14,11 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ Test sqlcipher backend internals. """ - - import os import time -import unittest import simplejson as json import threading @@ -50,15 +45,9 @@ from leap.soledad.client.sqlcipher import ( DatabaseIsNotEncrypted, open as u1db_open, ) -from leap.soledad.common.crypto import ( - EncryptionSchemes, - ENC_JSON_KEY, - ENC_SCHEME_KEY, -) -from leap.soledad.client.target import ( - decrypt_doc, - SoledadSyncTarget, -) +from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.common.crypto import ENC_SCHEME_KEY +from leap.soledad.client.crypto import decrypt_doc_dict # u1db tests stuff. @@ -269,6 +258,7 @@ class TestSQLCipherPartialExpandDatabase( db = SQLCipherDatabase.__new__( SQLCipherDatabase) db._db_handle = dbapi2.connect(path) # db is there but not yet init-ed + db._syncers = {} c = db._db_handle.cursor() c.execute('PRAGMA key="%s"' % PASSWORD) self.addCleanup(db.close) @@ -614,7 +604,12 @@ class SQLCipherDatabaseSyncTests( self.sync(self.db2, db3) doc3 = db3.get_doc('the-doc') if ENC_SCHEME_KEY in doc3.content: - doc3.set_json(decrypt_doc(self._soledad._crypto, doc3)) + _crypto = self._soledad._crypto + key = _crypto.doc_passphrase(doc3.doc_id) + secret = _crypto.secret + doc3.set_json(decrypt_doc_dict( + doc3.content, + doc3.doc_id, doc3.rev, key, secret)) self.assertEqual(doc4.get_json(), doc3.get_json()) self.assertFalse(doc3.has_conflicts) @@ -796,7 +791,7 @@ class SQLCipherEncryptionTest(BaseLeapTest): # trying to open the a non-encrypted database with sqlcipher # backend should raise a DatabaseIsNotEncrypted exception. SQLCipherDatabase(self.DB_FILE, PASSWORD) - raise db1pi2.DatabaseError( + raise dbapi2.DatabaseError( "SQLCipher backend should not be able to open non-encrypted " "dbs.") except DatabaseIsNotEncrypted: diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py index fd4a2797..0433fac9 100644 --- a/common/src/leap/soledad/common/tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # test_sync.py -# Copyright (C) 2014 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,26 +24,31 @@ import threading import time from urlparse import urljoin -from leap.soledad.common.couch import ( - CouchServerState, - CouchDatabase, -) +from leap.soledad.common import couch +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests import test_sync_target +from leap.soledad.common.tests import u1db_tests as tests from leap.soledad.common.tests.u1db_tests import ( TestCaseWithServer, simple_doc, + test_backends, + test_sync ) from leap.soledad.common.tests.test_couch import CouchDBTestCase -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_target_soledad import ( make_token_soledad_app, make_leap_document_for_test, - token_leap_sync_target, ) - +from leap.soledad.common.tests.test_sync_target import token_leap_sync_target from leap.soledad.client import ( Soledad, target, ) +from leap.soledad.common.tests.util import SoledadWithCouchServerMixin +from leap.soledad.client.sync import SoledadSynchronizer +from leap.soledad.server import SoledadApp + class InterruptableSyncTestCase( @@ -99,8 +104,8 @@ class InterruptableSyncTestCase( secret_id=secret_id) def make_app(self): - self.request_state = CouchServerState(self._couch_url, 'shared', - 'tokens') + self.request_state = couch.CouchServerState( + self._couch_url, 'shared', 'tokens') return self.make_app_with_state(self.request_state) def setUp(self): @@ -150,7 +155,7 @@ class InterruptableSyncTestCase( sol.create_doc(json.loads(simple_doc)) # ensure remote db exists before syncing - db = CouchDatabase.open_database( + db = couch.CouchDatabase.open_database( urljoin(self._couch_url, 'user-user-uuid'), create=True, ensure_ddocs=True) @@ -174,3 +179,114 @@ class InterruptableSyncTestCase( db.delete_database() db.close() sol.close() + + +def make_soledad_app(state): + return SoledadApp(state) + + +class TestSoledadDbSync( + SoledadWithCouchServerMixin, + test_sync.TestDbSync): + """ + Test db.sync remote sync shortcut + """ + + scenarios = [ + ('py-http', { + 'make_app_with_state': make_soledad_app, + 'make_database_for_test': tests.make_memory_database_for_test, + }), + ('py-token-http', { + 'make_app_with_state': test_sync_target.make_token_soledad_app, + 'make_database_for_test': tests.make_memory_database_for_test, + 'token': True + }), + ] + + oauth = False + token = False + + def setUp(self): + """ + Need to explicitely invoke inicialization on all bases. + """ + tests.TestCaseWithServer.setUp(self) + self.main_test_class = test_sync.TestDbSync + SoledadWithCouchServerMixin.setUp(self) + self.startServer() + self.db2 = couch.CouchDatabase.open_database( + urljoin( + 'http://localhost:' + str(self.wrapper.port), 'test'), + create=True, + ensure_ddocs=True) + + def tearDown(self): + """ + Need to explicitely invoke destruction on all bases. + """ + self.db2.delete_database() + SoledadWithCouchServerMixin.tearDown(self) + tests.TestCaseWithServer.tearDown(self) + + def do_sync(self, target_name): + """ + Perform sync using SoledadSynchronizer, SoledadSyncTarget + and Token auth. + """ + extra = {} + extra = dict(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + target_url = self.getURL(target_name) + return SoledadSynchronizer( + self.db, + target.SoledadSyncTarget( + target_url, + crypto=self._soledad._crypto, + **extra)).sync(autocreate=True, + defer_decryption=False) + + def test_db_sync(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db2.create_doc_from_json(tests.nested_doc) + local_gen_before_sync = self.do_sync('test') + gen, _, changes = self.db.whats_changed(local_gen_before_sync) + self.assertEqual(1, len(changes)) + self.assertEqual(doc2.doc_id, changes[0][0]) + self.assertEqual(1, gen - local_gen_before_sync) + self.assertGetEncryptedDoc( + self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) + + def test_db_sync_autocreate(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db.create_doc_from_json(tests.simple_doc) + local_gen_before_sync = self.do_sync('test') + gen, _, changes = self.db.whats_changed(local_gen_before_sync) + self.assertEqual(0, gen - local_gen_before_sync) + db3 = self.request_state.open_database('test') + gen, _, changes = db3.whats_changed() + self.assertEqual(1, len(changes)) + self.assertEqual(doc1.doc_id, changes[0][0]) + self.assertGetEncryptedDoc( + db3, doc1.doc_id, doc1.rev, tests.simple_doc, False) + t_gen, _ = self.db._get_replica_gen_and_trans_id( + db3.replica_uid) + s_gen, _ = db3._get_replica_gen_and_trans_id('test1') + self.assertEqual(1, t_gen) + self.assertEqual(1, s_gen) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py new file mode 100644 index 00000000..48e3150f --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# test_sync_deferred.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync with deferred encryption/decryption. +""" +import time +import os +import random +import string +from urlparse import urljoin + +from leap.soledad.common.tests import u1db_tests as tests, ADDRESS +from leap.soledad.common.tests.u1db_tests import test_sync + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import couch +from leap.soledad.client import target +from leap.soledad.client.sync import SoledadSynchronizer + +# Just to make clear how this test is different... :) +DEFER_DECRYPTION = True + +WAIT_STEP = 1 +MAX_WAIT = 10 + +from leap.soledad.common.tests import test_sqlcipher as ts +from leap.soledad.server import SoledadApp + + +from leap.soledad.client.sqlcipher import open as open_sqlcipher +from leap.soledad.common.tests.util import SoledadWithCouchServerMixin +from leap.soledad.common.tests.util import make_soledad_app + + +DBPASS = "pass" + + +class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): + """ + Another base class for testing the deferred encryption/decryption during + the syncs, using the intermediate database. + """ + defer_sync_encryption = True + + def setUp(self): + # config info + self.db1_file = os.path.join(self.tempdir, "db1.u1db") + self.db_pass = DBPASS + self.email = ADDRESS + + # get a random prefix for each test, so we do not mess with + # concurrency during initialization and shutting down of + # each local db. + self.rand_prefix = ''.join( + map(lambda x: random.choice(string.ascii_letters), range(6))) + # initialize soledad by hand so we can control keys + self._soledad = self._soledad_instance( + prefix=self.rand_prefix, user=self.email) + + # open test dbs: db1 will be the local sqlcipher db + # (which instantiates a syncdb) + self.db1 = open_sqlcipher(self.db1_file, DBPASS, create=True, + document_factory=SoledadDocument, + crypto=self._soledad._crypto, + defer_encryption=True) + self.db2 = couch.CouchDatabase.open_database( + urljoin( + 'http://localhost:' + str(self.wrapper.port), 'test'), + create=True, + ensure_ddocs=True) + + def tearDown(self): + self.db1.close() + self.db2.close() + self._soledad.close() + + # XXX should not access "private" attrs + for f in [self._soledad._local_db_path, + self._soledad._secrets_path, + self.db1._sync_db_path]: + if os.path.isfile(f): + os.unlink(f) + + +#SQLCIPHER_SCENARIOS = [ +# ('http', { +# #'make_app_with_state': test_sync_target.make_token_soledad_app, +# 'make_app_with_state': make_soledad_app, +# 'make_database_for_test': ts.make_sqlcipher_database_for_test, +# 'copy_database_for_test': ts.copy_sqlcipher_database_for_test, +# 'make_document_for_test': ts.make_document_for_test, +# 'token': True +# }), +#] + + +class SyncTimeoutError(Exception): + """ + Dummy exception to notify timeout during sync. + """ + pass + + +class TestSoledadDbSyncDeferredEncDecr( + BaseSoledadDeferredEncTest, + test_sync.TestDbSync): + """ + Test db.sync remote sync shortcut. + Case with deferred encryption and decryption: using the intermediate + syncdb. + """ + + scenarios = [ + ('http', { + 'make_app_with_state': make_soledad_app, + 'make_database_for_test': tests.make_memory_database_for_test, + }), + ] + + oauth = False + token = True + + def setUp(self): + """ + Need to explicitely invoke inicialization on all bases. + """ + tests.TestCaseWithServer.setUp(self) + self.main_test_class = test_sync.TestDbSync + BaseSoledadDeferredEncTest.setUp(self) + self.startServer() + self.syncer = None + + def tearDown(self): + """ + Need to explicitely invoke destruction on all bases. + """ + BaseSoledadDeferredEncTest.tearDown(self) + tests.TestCaseWithServer.tearDown(self) + + def do_sync(self, target_name): + """ + Perform sync using SoledadSynchronizer, SoledadSyncTarget + and Token auth. + """ + if self.token: + extra = dict(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + target_url = self.getURL(target_name) + syncdb = getattr(self.db1, "_sync_db", None) + + syncer = SoledadSynchronizer( + self.db1, + target.SoledadSyncTarget( + target_url, + crypto=self._soledad._crypto, + sync_db=syncdb, + **extra)) + # Keep a reference to be able to know when the sync + # has finished. + self.syncer = syncer + return syncer.sync( + autocreate=True, defer_decryption=DEFER_DECRYPTION) + else: + return test_sync.TestDbSync.do_sync(self, target_name) + + def wait_for_sync(self): + """ + Wait for sync to finish. + """ + wait = 0 + syncer = self.syncer + if syncer is not None: + while syncer.syncing: + time.sleep(WAIT_STEP) + wait += WAIT_STEP + if wait >= MAX_WAIT: + raise SyncTimeoutError + + def test_db_sync(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db1.create_doc_from_json(tests.simple_doc) + doc2 = self.db2.create_doc_from_json(tests.nested_doc) + + import time + # need to give time to the encryption to proceed + # TODO should implement a defer list to subscribe to the all-decrypted + # event + time.sleep(2) + + local_gen_before_sync = self.do_sync('test') + self.wait_for_sync() + + gen, _, changes = self.db1.whats_changed(local_gen_before_sync) + self.assertEqual(1, len(changes)) + + self.assertEqual(doc2.doc_id, changes[0][0]) + self.assertEqual(1, gen - local_gen_before_sync) + + self.assertGetEncryptedDoc( + self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False) + + def test_db_sync_autocreate(self): + pass + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py new file mode 100644 index 00000000..edc4589b --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync_target.py @@ -0,0 +1,589 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync target +""" +import cStringIO +import os + +import simplejson as json +import u1db + +from uuid import uuid4 + +from u1db.remote import http_database + +from u1db import SyncTarget +from u1db.sync import Synchronizer +from u1db.remote import ( + http_client, + http_database, + http_target, +) + +from leap.soledad import client +from leap.soledad.client import ( + target, + auth, + crypto, + VerifiedHTTPSConnection, + sync, +) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.util import ( + make_sqlcipher_database_for_test, + make_soledad_app, + make_token_soledad_app, + SoledadWithCouchServerMixin, +) +from leap.soledad.common.tests.u1db_tests import test_backends +from leap.soledad.common.tests.u1db_tests import test_remote_sync_target +from leap.soledad.common.tests.u1db_tests import test_sync +from leap.soledad.common.tests.test_couch import ( + CouchDBTestCase, + CouchDBWrapper, +) + +from leap.soledad.server import SoledadApp +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_leap_document_for_test(test, doc_id, rev, content, + has_conflicts=False): + return SoledadDocument( + doc_id, rev, content, has_conflicts=has_conflicts) + + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': make_leap_document_for_test, + 'make_app_with_state': make_soledad_app}), +] + + +def make_token_http_database_for_test(test, replica_uid): + test.startServer() + test.request_state._create_database(replica_uid) + + class _HTTPDatabaseWithToken( + http_database.HTTPDatabase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + http_db = _HTTPDatabaseWithToken(test.getURL('test')) + http_db.set_token_credentials('user-uuid', 'auth-token') + return http_db + + +def copy_token_http_database_for_test(test, db): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS + # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE + # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN + # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR + # HOUSE. + http_db = test.request_state._copy_database(db) + http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') + return http_db + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_remote_sync_target`. +#----------------------------------------------------------------------------- + +class TestSoledadSyncTargetBasics( + test_remote_sync_target.TestHTTPSyncTargetBasics): + """ + Some tests had to be copied to this class so we can instantiate our own + target. + """ + + def test_parse_url(self): + remote_target = target.SoledadSyncTarget('http://127.0.0.1:12345/') + self.assertEqual('http', remote_target._url.scheme) + self.assertEqual('127.0.0.1', remote_target._url.hostname) + self.assertEqual(12345, remote_target._url.port) + self.assertEqual('/', remote_target._url.path) + + +class TestSoledadParsingSyncStream( + test_remote_sync_target.TestParsingSyncStream, + BaseSoledadTest): + """ + Some tests had to be copied to this class so we can instantiate our own + target. + """ + + def setUp(self): + test_remote_sync_target.TestParsingSyncStream.setUp(self) + + def tearDown(self): + test_remote_sync_target.TestParsingSyncStream.tearDown(self) + + def test_extra_comma(self): + """ + Test adapted to use encrypted content. + """ + doc = SoledadDocument('i', rev='r') + doc.content = {} + _crypto = self._soledad._crypto + key = _crypto.doc_passphrase(doc.doc_id) + secret = _crypto.secret + + enc_json = crypto.encrypt_docstr( + doc.get_json(), doc.doc_id, doc.rev, + key, secret) + tgt = target.SoledadSyncTarget( + "http://foo/foo", crypto=self._soledad._crypto) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n{},\r\n]", None) + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{},\r\n{"id": "i", "rev": "r", ' + '"content": %s, "gen": 3, "trans_id": "T-sid"}' + ',\r\n]' % json.dumps(enc_json), + lambda doc, gen, trans_id: None) + + def test_wrong_start(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "{}\r\n]", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "\r\n{}\r\n]", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "", None) + + def test_wrong_end(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n{}", None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n", None) + + def test_missing_comma(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{}\r\n{"id": "i", "rev": "r", ' + '"content": "c", "gen": 3}\r\n]', None) + + def test_no_entries(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, "[\r\n]", None) + + def test_error_in_stream(self): + tgt = target.SoledadSyncTarget("http://foo/foo") + + self.assertRaises(u1db.errors.Unavailable, + tgt._parse_sync_stream, + '[\r\n{"new_generation": 0},' + '\r\n{"error": "unavailable"}\r\n', None) + + self.assertRaises(u1db.errors.Unavailable, + tgt._parse_sync_stream, + '[\r\n{"error": "unavailable"}\r\n', None) + + self.assertRaises(u1db.errors.BrokenSyncStream, + tgt._parse_sync_stream, + '[\r\n{"error": "?"}\r\n', None) + + +# +# functions for TestRemoteSyncTargets +# + +def leap_sync_target(test, path): + return target.SoledadSyncTarget( + test.getURL(path), crypto=test._soledad._crypto) + + +def token_leap_sync_target(test, path): + st = leap_sync_target(test, path) + st.set_token_credentials('user-uuid', 'auth-token') + return st + + +def make_local_db_and_soledad_target(test, path='test'): + test.startServer() + db = test.request_state._create_database(os.path.basename(path)) + st = target.SoledadSyncTarget.connect( + test.getURL(path), crypto=test._soledad._crypto) + return db, st + + +def make_local_db_and_token_soledad_target(test): + db, st = make_local_db_and_soledad_target(test, 'test') + st.set_token_credentials('user-uuid', 'auth-token') + return db, st + + +class TestSoledadSyncTarget( + SoledadWithCouchServerMixin, + test_remote_sync_target.TestRemoteSyncTargets): + + scenarios = [ + ('token_soledad', + {'make_app_with_state': make_token_soledad_app, + 'make_document_for_test': make_leap_document_for_test, + 'create_db_and_target': make_local_db_and_token_soledad_target, + 'make_database_for_test': make_sqlcipher_database_for_test, + 'sync_target': token_leap_sync_target}), + ] + + def setUp(self): + tests.TestCaseWithServer.setUp(self) + self.main_test_class = test_remote_sync_target.TestRemoteSyncTargets + SoledadWithCouchServerMixin.setUp(self) + self.startServer() + self.db1 = make_sqlcipher_database_for_test(self, 'test1') + self.db2 = self.request_state._create_database('test2') + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + tests.TestCaseWithServer.tearDown(self) + db, _ = self.request_state.ensure_database('test2') + db.delete_database() + + def test_sync_exchange_send(self): + """ + Test for sync exchanging send of document. + + This test was adapted to decrypt remote content before assert. + """ + self.startServer() + db = self.request_state._create_database('test') + remote_target = self.getSyncTarget('test') + other_docs = [] + + def receive_doc(doc, gen, trans_id): + other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + new_gen, trans_id = remote_target.sync_exchange( + [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc, + defer_decryption=False) + self.assertEqual(1, new_gen) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', False) + + def test_sync_exchange_send_failure_and_retry_scenario(self): + """ + Test for sync exchange failure and retry. + + This test was adapted to: + - decrypt remote content before assert. + - not expect a bounced document because soledad has stateful + recoverable sync. + """ + + self.startServer() + + def blackhole_getstderr(inst): + return cStringIO.StringIO() + + self.patch(self.server.RequestHandlerClass, 'get_stderr', + blackhole_getstderr) + db = self.request_state._create_database('test') + _put_doc_if_newer = db._put_doc_if_newer + trigger_ids = ['doc-here2'] + + def bomb_put_doc_if_newer(self, doc, save_conflict, + replica_uid=None, replica_gen=None, + replica_trans_id=None, number_of_docs=None, + doc_idx=None, sync_id=None): + if doc.doc_id in trigger_ids: + raise Exception + return _put_doc_if_newer(doc, save_conflict=save_conflict, + replica_uid=replica_uid, + replica_gen=replica_gen, + replica_trans_id=replica_trans_id, + number_of_docs=number_of_docs, + doc_idx=doc_idx, sync_id=sync_id) + from leap.soledad.common.tests.test_couch import IndexedCouchDatabase + self.patch( + IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) + remote_target = self.getSyncTarget('test') + other_changes = [] + + def receive_doc(doc, gen, trans_id): + other_changes.append( + (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) + + doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + doc2 = self.make_document('doc-here2', 'replica:1', + '{"value": "here2"}') + + # we do not expect an HTTPError because soledad sync fails gracefully + remote_target.sync_exchange( + [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], + 'replica', last_known_generation=0, last_known_trans_id=None, + return_doc_cb=receive_doc) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', + False) + self.assertEqual( + (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica')) + self.assertEqual([], other_changes) + # retry + trigger_ids = [] + new_gen, trans_id = remote_target.sync_exchange( + [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc) + self.assertGetEncryptedDoc( + db, 'doc-here2', 'replica:1', '{"value": "here2"}', + False) + self.assertEqual( + (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica')) + self.assertEqual(2, new_gen) + self.assertEqual( + ('doc-here', 'replica:1', '{"value": "here"}', 1), + other_changes[0][:-1]) + + def test_sync_exchange_send_ensure_callback(self): + """ + Test for sync exchange failure and retry. + + This test was adapted to decrypt remote content before assert. + """ + self.startServer() + remote_target = self.getSyncTarget('test') + other_docs = [] + replica_uid_box = [] + + def receive_doc(doc, gen, trans_id): + other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + + def ensure_cb(replica_uid): + replica_uid_box.append(replica_uid) + + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + new_gen, trans_id = remote_target.sync_exchange( + [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=receive_doc, + ensure_callback=ensure_cb, defer_decryption=False) + self.assertEqual(1, new_gen) + db = self.request_state.open_database('test') + self.assertEqual(1, len(replica_uid_box)) + self.assertEqual(db._replica_uid, replica_uid_box[0]) + self.assertGetEncryptedDoc( + db, 'doc-here', 'replica:1', '{"value": "here"}', False) + + def test_sync_exchange_in_stream_error(self): + # we bypass this test because our sync_exchange process does not + # return u1db error 503 "unavailable" for now. + pass + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_sync`. +#----------------------------------------------------------------------------- + +target_scenarios = [ + ('token_leap', {'create_db_and_target': + make_local_db_and_token_soledad_target, + 'make_app_with_state': make_soledad_app}), +] + + +class SoledadDatabaseSyncTargetTests( + SoledadWithCouchServerMixin, test_sync.DatabaseSyncTargetTests): + + scenarios = ( + tests.multiply_scenarios( + tests.DatabaseBaseTests.scenarios, + target_scenarios)) + + whitebox = False + + def setUp(self): + self.main_test_class = test_sync.DatabaseSyncTargetTests + SoledadWithCouchServerMixin.setUp(self) + + def test_sync_exchange(self): + """ + Test sync exchange. + + This test was adapted to decrypt remote content before assert. + """ + sol, _ = make_local_db_and_soledad_target(self) + docs_by_gen = [ + (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, + 'T-sid')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc, + defer_decryption=False) + self.assertGetEncryptedDoc( + self.db, 'doc-id', 'replica:1', tests.simple_doc, False) + self.assertTransactionLog(['doc-id'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 1, last_trans_id), + (self.other_changes, new_gen, last_trans_id)) + self.assertEqual(10, self.st.get_sync_info('replica')[3]) + sol.close() + + def test_sync_exchange_push_many(self): + """ + Test sync exchange. + + This test was adapted to decrypt remote content before assert. + """ + docs_by_gen = [ + (self.make_document( + 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), + (self.make_document( + 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] + new_gen, trans_id = self.st.sync_exchange( + docs_by_gen, 'replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc, + defer_decryption=False) + self.assertGetEncryptedDoc( + self.db, 'doc-id', 'replica:1', tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db, 'doc-id2', 'replica:1', tests.nested_doc, False) + self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) + last_trans_id = self.getLastTransId(self.db) + self.assertEqual(([], 2, last_trans_id), + (self.other_changes, new_gen, trans_id)) + self.assertEqual(11, self.st.get_sync_info('replica')[3]) + + def test_sync_exchange_returns_many_new_docs(self): + """ + Test sync exchange. + + This test was adapted to avoid JSON serialization comparison as local + and remote representations might differ. It looks directly at the + doc's contents instead. + """ + doc = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db.create_doc_from_json(tests.nested_doc) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + new_gen, _ = self.st.sync_exchange( + [], 'other-replica', last_known_generation=0, + last_known_trans_id=None, return_doc_cb=self.receive_doc, + defer_decryption=False) + self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) + self.assertEqual(2, new_gen) + self.assertEqual( + [(doc.doc_id, doc.rev, 1), + (doc2.doc_id, doc2.rev, 2)], + [c[:-3] + c[-2:-1] for c in self.other_changes]) + self.assertEqual( + json.loads(tests.simple_doc), + json.loads(self.other_changes[0][2])) + self.assertEqual( + json.loads(tests.nested_doc), + json.loads(self.other_changes[1][2])) + if self.whitebox: + self.assertEqual( + self.db._last_exchange_log['return'], + {'last_gen': 2, 'docs': + [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) + + +class TestSoledadDbSync( + SoledadWithCouchServerMixin, test_sync.TestDbSync): + """Test db.sync remote sync shortcut""" + + scenarios = [ + ('py-token-http', { + 'create_db_and_target': make_local_db_and_token_soledad_target, + 'make_app_with_state': make_token_soledad_app, + 'make_database_for_test': make_sqlcipher_database_for_test, + 'token': True + }), + ] + + oauth = False + token = False + + def setUp(self): + self.main_test_class = test_sync.TestDbSync + SoledadWithCouchServerMixin.setUp(self) + + def do_sync(self, target_name): + """ + Perform sync using SoledadSynchronizer, SoledadSyncTarget + and Token auth. + """ + if self.token: + extra = dict(creds={'token': { + 'uuid': 'user-uuid', + 'token': 'auth-token', + }}) + target_url = self.getURL(target_name) + return sync.SoledadSynchronizer( + self.db, + target.SoledadSyncTarget( + target_url, + crypto=self._soledad._crypto, + **extra)).sync(autocreate=True, + defer_decryption=False) + else: + return test_sync.TestDbSync.do_sync(self, target_name) + + def test_db_sync(self): + """ + Test sync. + + Adapted to check for encrypted content. + """ + doc1 = self.db.create_doc_from_json(tests.simple_doc) + doc2 = self.db2.create_doc_from_json(tests.nested_doc) + local_gen_before_sync = self.do_sync('test2') + gen, _, changes = self.db.whats_changed(local_gen_before_sync) + self.assertEqual(1, len(changes)) + self.assertEqual(doc2.doc_id, changes[0][0]) + self.assertEqual(1, gen - local_gen_before_sync) + self.assertGetEncryptedDoc( + self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) + self.assertGetEncryptedDoc( + self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) + + def test_db_sync_autocreate(self): + """ + We bypass this test because we never need to autocreate databases. + """ + pass + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_target.py b/common/src/leap/soledad/common/tests/test_target.py index 3457a3e1..6242099d 100644 --- a/common/src/leap/soledad/common/tests/test_target.py +++ b/common/src/leap/soledad/common/tests/test_target.py @@ -437,13 +437,17 @@ class TestSoledadSyncTarget( def bomb_put_doc_if_newer(self, doc, save_conflict, replica_uid=None, replica_gen=None, - replica_trans_id=None): + replica_trans_id=None, number_of_docs=None, + doc_idx=None, sync_id=None): if doc.doc_id in trigger_ids: raise Exception return _put_doc_if_newer(doc, save_conflict=save_conflict, replica_uid=replica_uid, replica_gen=replica_gen, - replica_trans_id=replica_trans_id) + replica_trans_id=replica_trans_id, + number_of_docs=number_of_docs, + doc_idx=doc_idx, + sync_id=sync_id) from leap.soledad.common.tests.test_couch import IndexedCouchDatabase self.patch( IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) @@ -457,9 +461,8 @@ class TestSoledadSyncTarget( doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') doc2 = self.make_document('doc-here2', 'replica:1', '{"value": "here2"}') - self.assertRaises( - u1db.errors.HTTPError, - remote_target.sync_exchange, + # We do not expect an exception here because the sync fails gracefully + remote_target.sync_exchange( [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], 'replica', last_known_generation=0, last_known_trans_id=None, return_doc_cb=receive_doc) @@ -480,11 +483,9 @@ class TestSoledadSyncTarget( self.assertEqual( (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica')) self.assertEqual(2, new_gen) - # we do not expect the document to be bounced back because soledad has - # stateful sync - #self.assertEqual( - # ('doc-here', 'replica:1', '{"value": "here"}', 1), - # other_changes[0][:-1]) + self.assertEqual( + ('doc-here', 'replica:1', '{"value": "here"}', 1), + other_changes[0][:-1]) def test_sync_exchange_send_ensure_callback(self): """ diff --git a/common/src/leap/soledad/common/tests/test_target_soledad.py b/common/src/leap/soledad/common/tests/test_target_soledad.py new file mode 100644 index 00000000..899203b8 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_target_soledad.py @@ -0,0 +1,102 @@ +from u1db.remote import ( + http_database, +) + +from leap.soledad.client import ( + auth, + VerifiedHTTPSConnection, +) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.server import SoledadApp +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.u1db_tests import test_backends + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_leap_document_for_test(test, doc_id, rev, content, + has_conflicts=False): + return SoledadDocument( + doc_id, rev, content, has_conflicts=has_conflicts) + + +def make_soledad_app(state): + return SoledadApp(state) + + +def make_token_soledad_app(state): + app = SoledadApp(state) + + def _verify_authentication_data(uuid, auth_data): + if uuid == 'user-uuid' and auth_data == 'auth-token': + return True + return False + + # we test for action authorization in leap.soledad.common.tests.test_server + def _verify_authorization(uuid, environ): + return True + + application = SoledadTokenAuthMiddleware(app) + application._verify_authentication_data = _verify_authentication_data + application._verify_authorization = _verify_authorization + return application + + +LEAP_SCENARIOS = [ + ('http', { + 'make_database_for_test': test_backends.make_http_database_for_test, + 'copy_database_for_test': test_backends.copy_http_database_for_test, + 'make_document_for_test': make_leap_document_for_test, + 'make_app_with_state': make_soledad_app}), +] + + +def make_token_http_database_for_test(test, replica_uid): + test.startServer() + test.request_state._create_database(replica_uid) + + class _HTTPDatabaseWithToken( + http_database.HTTPDatabase, auth.TokenBasedAuth): + + def set_token_credentials(self, uuid, token): + auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + return auth.TokenBasedAuth._sign_request( + self, method, url_query, params) + + http_db = _HTTPDatabaseWithToken(test.getURL('test')) + http_db.set_token_credentials('user-uuid', 'auth-token') + return http_db + + +def copy_token_http_database_for_test(test, db): + # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS + # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE + # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN + # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR + # HOUSE. + http_db = test.request_state._copy_database(db) + http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') + return http_db + + +class SoledadTests(test_backends.AllDatabaseTests, BaseSoledadTest): + + scenarios = LEAP_SCENARIOS + [ + ('token_http', {'make_database_for_test': + make_token_http_database_for_test, + 'copy_database_for_test': + copy_token_http_database_for_test, + 'make_document_for_test': make_leap_document_for_test, + 'make_app_with_state': make_token_soledad_app, + }) + ] + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py index 99ff77b4..ad66fb06 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py @@ -13,8 +13,9 @@ # # You should have received a copy of the GNU Lesser General Public License # along with u1db. If not, see <http://www.gnu.org/licenses/>. - -"""Test infrastructure for U1DB""" +""" +Test infrastructure for U1DB +""" import copy import shutil diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py index c0a7e1f7..86e76fad 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py @@ -41,7 +41,7 @@ from u1db.remote import ( ) -def make_http_database_for_test(test, replica_uid, path='test'): +def make_http_database_for_test(test, replica_uid, path='test', *args): test.startServer() test.request_state._create_database(replica_uid) return http_database.HTTPDatabase(test.getURL(path)) diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index 633fd8dd..5e2bec86 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -1151,6 +1151,9 @@ class TestDbSync(tests.TestCaseWithServer): target_url = self.getURL(path) return self.db.sync(target_url, **extra) + def sync(self, callback=None, autocreate=False, defer_decryption=False): + return super(TestDbSync, self).sync(callback, autocreate) + def setUp(self): super(TestDbSync, self).setUp() self.startServer() diff --git a/debian/changelog b/debian/changelog index 212f4309..f2f5411c 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +soledad (0.6.0) unstable; urgency=low + + * Update to 0.6.0 release. + + -- db <db@leap.se> Wed, 01 Oct 2014 15:39:38 -0300 + soledad (0.5.2.1) unstable; urgency=medium * Update soledad-client version dependency on python-pycryptopp diff --git a/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py b/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py new file mode 100755 index 00000000..b2b35d30 --- /dev/null +++ b/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py @@ -0,0 +1,46 @@ +#!/usr/bin/python + + +import logging +import argparse +import psutil +import time + + +def find_procs(procs): + result = [] + for name, executable in procs: + found = filter( + lambda p: executable == p.name, + psutil.process_iter()) + if len(found) == 1: + result.append(found[0]) + return result + + +def log_memory(soledad, bigcouch): + while True: + print "%f %f" % \ + (soledad.get_memory_percent(), bigcouch.get_memory_percent()) + time.sleep(1) + + +if __name__ == '__main__': + + # configure logger + logger = logging.getLogger(__name__) + LOG_FORMAT = '%(asctime)s %(message)s' + logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + + # parse command line + parser = argparse.ArgumentParser() + parser.add_argument( + '-l', dest='logfile', + help='log output to file') + args = parser.parse_args() + + log_memory(*find_procs([ + ('Soledad', 'twistd'), + ('Bigcouch', 'beam.smp')])) + diff --git a/scripts/profiling/doc_put_memory_usage/profile-procs.py b/scripts/profiling/doc_put_memory_usage/profile-procs.py new file mode 100755 index 00000000..53f5977b --- /dev/null +++ b/scripts/profiling/doc_put_memory_usage/profile-procs.py @@ -0,0 +1,54 @@ +#!/usr/bin/python + + +import logging +import argparse +import psutil +import time + + +def find_procs(procs): + result = [] + for name, executable in procs: + found = filter( + lambda p: executable == p.name, + psutil.process_iter()) + if len(found) == 1: + result.append(found[0]) + return result + + +def log_usage(procs, logger): + names = [proc.name for proc in procs] + logger.info("Logging cpu and memory for: %s" % names) + while True: + s = '%f %f' %\ + (psutil.cpu_percent(), psutil.phymem_usage().percent) + for proc in procs: + s += ' %f %f' % \ + (proc.get_cpu_percent(), proc.get_memory_percent()) + logger.info(s) + time.sleep(1) + + +if __name__ == '__main__': + # parse command line + parser = argparse.ArgumentParser() + parser.add_argument( + '-l', dest='logfile', + help='log output to file') + args = parser.parse_args() + + # configure logger + logger = logging.getLogger(__name__) + LOG_FORMAT = '%(asctime)s %(message)s' + logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + if args.logfile is not None: + handler = logging.FileHandler(args.logfile, mode='a') + handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) + logger.addHandler(handler) + + log_usage(find_procs([ + ('Soledad', 'twistd'), + ('Bigcouch', 'beam.smp')]), logger) diff --git a/scripts/profiling/spam.sh b/scripts/profiling/spam.sh new file mode 100755 index 00000000..a4f2b8ef --- /dev/null +++ b/scripts/profiling/spam.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +if [ $# -ne 2 ]; then + echo "Usage: ${0} <target_address> <number_of_messages>" + exit 1 +fi + +target_address=${1} +missing=${2} +echo "Will send ${missing} messages to ${target_address}..." + +while [[ ${success} -eq 0 && ${missing} -gt 0 ]]; do + echo " missing: ${missing}" + swaks -S \ + -f ${target_address} \ + -t ${target_address} \ + -s chipmonk.cdev.bitmask.net \ + -tlsc + if [ $? -eq 0 ]; then + missing=`expr ${missing} - 1` + else + echo " error, retrying..." + fi +done diff --git a/scripts/profiling/sync/client_side_db.py b/scripts/profiling/sync/client_side_db.py new file mode 120000 index 00000000..9e49a7f0 --- /dev/null +++ b/scripts/profiling/sync/client_side_db.py @@ -0,0 +1 @@ +../../db_access/client_side_db.py
\ No newline at end of file diff --git a/scripts/profiling/sync/plot.py b/scripts/profiling/sync/plot.py new file mode 100755 index 00000000..8e3e1c15 --- /dev/null +++ b/scripts/profiling/sync/plot.py @@ -0,0 +1,93 @@ +#!/usr/bin/python + + +import argparse +from matplotlib import pyplot as plt +from movingaverage import movingaverage +from scipy.interpolate import interp1d +from numpy import linspace + + +def smooth(l): + return movingaverage(l, 3, data_is_list=True, avoid_fp_drift=False) + + +def plot(filename, subtitle=''): + + # config the plot + plt.xlabel('time (s)') + plt.ylabel('usage (%)') + title = 'soledad sync' + if subtitle != '': + title += '- %s' % subtitle + plt.title(title) + + x = [] + ycpu = [] + ymem = [] + ypcpu = [] + ypmem = [] + + ys = [ + (ycpu, 'total cpu', 'r'), + (ymem, 'total mem', 'b'), + (ypcpu, 'proc cpu', 'm'), + (ypmem, 'proc mem', 'g'), + ] + + # read data from file + with open(filename, 'r') as f: + line = f.readline() + while True: + line = f.readline() + if line.startswith('#'): + continue + if line == '' or line is None: + break + time, cpu, mem, pcpu, pmem = tuple(line.strip().split(' ')) + x.append(float(time)) + ycpu.append(float(cpu)) + ymem.append(float(mem)) + ypcpu.append(float(pcpu)) + ypmem.append(float(pmem)) + + smoothx = [n for n in smooth(x)] + #xnew = linspace(0.01, 19, 100) + + for y in ys: + kwargs = { + 'linewidth': 1.0, + 'linestyle': '-', + # 'marker': '.', + 'color': y[2], + } + #f = interp1d(x, y[0], kind='cubic') + plt.plot( + smoothx, + [n for n in smooth(y[0])], + #xnew, + #f(xnew), + label=y[1], **kwargs) + + #plt.axes().get_xaxis().set_ticks(x) + #plt.axes().get_xaxis().set_ticklabels(x) + + # annotate max and min values + plt.xlim(0, 20) + plt.ylim(0, 100) + plt.grid() + plt.legend() + plt.show() + + +if __name__ == '__main__': + # parse command line + parser = argparse.ArgumentParser() + parser.add_argument( + '-d', dest='datafile', required=False, default='/tmp/profile.log', + help='the data file to plot') + parser.add_argument( + '-s', dest='subtitle', required=False, default='', + help='a subtitle for the plot') + args = parser.parse_args() + plot(args.datafile, args.subtitle) diff --git a/scripts/profiling/sync/profile-sync.py b/scripts/profiling/sync/profile-sync.py new file mode 100644 index 00000000..fdd5b5a6 --- /dev/null +++ b/scripts/profiling/sync/profile-sync.py @@ -0,0 +1,62 @@ +#!/usr/bin/python + + +import argparse +import logging + + +from util import StatsLogger, ValidateUserHandle +from client_side_db import get_soledad_instance +#from plot import plot + + +# create a logger +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +# main program + +if __name__ == '__main__': + + # parse command line + parser = argparse.ArgumentParser() + parser.add_argument( + 'user@provider', action=ValidateUserHandle, help='the user handle') + parser.add_argument( + '-b', dest='basedir', required=False, default=None, + help='soledad base directory') + parser.add_argument( + '-p', dest='passphrase', required=False, default=None, + help='the user passphrase') + parser.add_argument( + '-l', dest='logfile', required=False, default='/tmp/profile.log', + help='the file to which write the log') + args = parser.parse_args() + + # get the password + passphrase = args.passphrase + if passphrase is None: + passphrase = getpass.getpass( + 'Password for %s@%s: ' % (args.username, args.provider)) + + # get the basedir + basedir = args.basedir + if basedir is None: + basedir = tempfile.mkdtemp() + logger.info('Using %s as base directory.' % basedir) + + # get the soledad instance + s = get_soledad_instance( + args.username, args.provider, passphrase, basedir) + for i in xrange(10): + s.create_doc({}) + + sl = StatsLogger( + "soledad-sync", args.logfile, procs=["python"], interval=0.001) + sl.start() + s.sync() + sl.stop() + + #plot(args.logfile) diff --git a/scripts/profiling/sync/util.py b/scripts/profiling/sync/util.py new file mode 120000 index 00000000..7f16d684 --- /dev/null +++ b/scripts/profiling/sync/util.py @@ -0,0 +1 @@ +../util.py
\ No newline at end of file diff --git a/scripts/profiling/util.py b/scripts/profiling/util.py new file mode 100644 index 00000000..adf1de8c --- /dev/null +++ b/scripts/profiling/util.py @@ -0,0 +1,75 @@ +import re +import psutil +import time +import threading +import argparse +import pytz +import datetime + + +class ValidateUserHandle(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') + res = m.match(values) + if res == None: + parser.error('User handle should have the form user@provider.') + setattr(namespace, 'username', res.groups()[0]) + setattr(namespace, 'provider', res.groups()[1]) + + +class StatsLogger(threading.Thread): + + def __init__(self, name, fname, procs=[], interval=0.01): + threading.Thread.__init__(self) + self._stopped = True + self._name = name + self._fname = fname + self._procs = self._find_procs(procs) + self._interval = interval + + def _find_procs(self, procs): + return filter(lambda p: p.name in procs, psutil.process_iter()) + + def run(self): + self._stopped = False + with open(self._fname, 'w') as f: + self._start = time.time() + f.write(self._make_header()) + while self._stopped is False: + f.write('%s %s\n' % + (self._make_general_stats(), self._make_proc_stats())) + time.sleep(self._interval) + f.write(self._make_footer()) + + def _make_general_stats(self): + now = time.time() + stats = [] + stats.append("%f" % (now - self._start)) # elapsed time + stats.append("%f" % psutil.cpu_percent()) # total cpu + stats.append("%f" % psutil.phymem_usage().percent) # total memory + return ' '.join(stats) + + def _make_proc_stats(self): + stats = [] + for p in self._procs: + stats.append('%f' % p.get_cpu_percent()) # proc cpu + stats.append('%f' % p.get_memory_percent()) # proc memory + return ' '.join(stats) + + def _make_header(self): + header = [] + header.append('# test_name: %s' % self._name) + header.append('# start_time: %s' % datetime.datetime.now(pytz.utc)) + header.append( + '# elapsed_time total_cpu total_memory proc_cpu proc_memory ') + return '\n'.join(header) + '\n' + + def _make_footer(self): + footer = [] + footer.append('# end_time: %s' % datetime.datetime.now(pytz.utc)) + return '\n'.join(footer) + + def stop(self): + self._stopped = True + + diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index 7cbca401..28717664 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -3,13 +3,13 @@ couchdb simplejson u1db routes -PyOpenSSL +PyOpenSSL<0.14 # TODO: maybe we just want twisted-web? twisted>=12.0.0 # leap deps -- bump me! -leap.soledad.common>=0.3.0 +leap.soledad.common>=0.6.0 # # Things yet to fix: diff --git a/server/src/leap/soledad/server/_version.py b/server/src/leap/soledad/server/_version.py index a3227cde..cf4e6706 100644 --- a/server/src/leap/soledad/server/_version.py +++ b/server/src/leap/soledad/server/_version.py @@ -5,8 +5,8 @@ # unpacked source archive. Distribution tarballs contain a pre-generated copy # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c' def get_versions(default={}, verbose=False): diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c6928aaa..6dc99b5a 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange): :param last_known_generation: The last target replica generation the source replica knows about. :type last_known_generation: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ self._db = db self.source_replica_uid = source_replica_uid @@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange): doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) - def insert_doc_from_source(self, doc, source_gen, trans_id): + def insert_doc_from_source(self, doc, source_gen, trans_id, + number_of_docs=None, doc_idx=None, sync_id=None): """Try to insert synced document from source. Conflicting documents are not inserted but will be sent over @@ -302,10 +305,18 @@ class SyncExchange(sync.SyncExchange): :type source_gen: int :param trans_id: The transaction id of that document change. :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int + :param sync_id: The id of the current sync session. + :type sync_id: str """ state, at_gen = self._db._put_doc_if_newer( doc, save_conflict=False, replica_uid=self.source_replica_uid, - replica_gen=source_gen, replica_trans_id=trans_id) + replica_gen=source_gen, replica_trans_id=trans_id, + number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id) if state == 'inserted': self._sync_state.put_seen_id(doc.doc_id, at_gen) elif state == 'converged': @@ -340,6 +351,8 @@ class SyncResource(http_app.SyncResource): :param last_known_trans_id: The last server replica transaction_id the client knows about. :type last_known_trans_id: str + :param sync_id: The id of the current sync session. + :type sync_id: str :param ensure: Whether the server replica should be created if it does not already exist. :type ensure: bool @@ -355,9 +368,11 @@ class SyncResource(http_app.SyncResource): # get a sync exchange object self.sync_exch = self.sync_exchange_class( db, self.source_replica_uid, last_known_generation, sync_id) + self._sync_id = sync_id @http_app.http_method(content_as_args=True) - def post_put(self, id, rev, content, gen, trans_id): + def post_put(self, id, rev, content, gen, trans_id, number_of_docs, + doc_idx): """ Put one incoming document into the server replica. @@ -373,9 +388,16 @@ class SyncResource(http_app.SyncResource): :param trans_id: The source replica transaction id corresponding to the revision of the incoming document. :type trans_id: str + :param number_of_docs: The total amount of documents sent on this sync + session. + :type number_of_docs: int + :param doc_idx: The index of the current document. + :type doc_idx: int """ doc = Document(id, rev, content) - self.sync_exch.insert_doc_from_source(doc, gen, trans_id) + self.sync_exch.insert_doc_from_source( + doc, gen, trans_id, number_of_docs=number_of_docs, + doc_idx=doc_idx, sync_id=self._sync_id) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): |