diff options
46 files changed, 5233 insertions, 772 deletions
| @@ -1,3 +1,26 @@ +0.6.0 Jul 18, 2014: +Client: +  o Close all connections after syncing. Fixes #5518. +  o Reset synchronizer state in order to reuse the same synchronizer +    object multiple times. +  o Use temporal database for encryption/decryption during +    sync. Closes #5326. +  o Add sync status signals. Closes #5517. +  o Allow for interrupting and recovering sync. Closes #5517. +  o Parallelize sync requests and reuse HTTP connections. +  o Split sync in multiple POST requests in client. Closes #5571. + +Common: +  o Use a dedicated HTTP resource for couch multipart PUTs to avoid +    bigcouch. Closes #5739. + +Server: +  o Pin PyOpenSSL dependency version to <0.14 to avoid yet another +    crypto dependency. +  o Authenticate in time-insensitive manner. Closes #3399. +  o Allow for interrupting and recovering sync. Closes #5517. +  o Split sync in multiple POST requests in server. Closes #5571. +  0.5.2 Jun 6, 2014:  Client:    o Reset synchronizer state in order to reuse the same synchronizer diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 7590aee5..b5abd4c7 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -3,12 +3,14 @@ simplejson  u1db  scrypt  pycryptopp +cchardet +zope.proxy  #  # leap deps  # -leap.soledad.common>=0.3.0 +leap.soledad.common>=0.6.0  #  # XXX things to fix yet: @@ -20,5 +22,3 @@ oauth  # pysqlite should not be a dep, see #2945  pysqlite - -chardet diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 0d3a21fd..586e3389 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -223,33 +223,48 @@ class Soledad(object):      """      def __init__(self, uuid, passphrase, secrets_path, local_db_path, -                 server_url, cert_file, auth_token=None, secret_id=None): +                 server_url, cert_file, +                 auth_token=None, secret_id=None, defer_encryption=False):          """          Initialize configuration, cryptographic keys and dbs.          :param uuid: User's uuid.          :type uuid: str +          :param passphrase: The passphrase for locking and unlocking encryption                             secrets for local and remote storage.          :type passphrase: unicode +          :param secrets_path: Path for storing encrypted key used for                               symmetric encryption.          :type secrets_path: str +          :param local_db_path: Path for local encrypted storage db.          :type local_db_path: str +          :param server_url: URL for Soledad server. This is used either to sync -            with the user's remote db and to interact with the shared recovery -            database. +                           with the user's remote db and to interact with the +                           shared recovery database.          :type server_url: str +          :param cert_file: Path to the certificate of the ca used                            to validate the SSL certificate used by the remote                            soledad server.          :type cert_file: str +          :param auth_token: Authorization token for accessing remote databases.          :type auth_token: str +        :param secret_id: The id of the storage secret to be used. +        :type secret_id: str + +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool +          :raise BootstrapSequenceError: Raised when the secret generation and -            storage on server sequence has failed for some reason. +                                       storage on server sequence has failed +                                       for some reason.          """          # get config params          self._uuid = uuid @@ -258,8 +273,10 @@ class Soledad(object):          # init crypto variables          self._secrets = {}          self._secret_id = secret_id -        # init config (possibly with default values) +        self._defer_encryption = defer_encryption +          self._init_config(secrets_path, local_db_path, server_url) +          self._set_token(auth_token)          self._shared_db_instance = None          # configure SSL certificate @@ -390,6 +407,7 @@ class Soledad(object):              # release the lock on shared db              try:                  self._shared_db.unlock(token) +                self._shared_db.close()              except NotLockedError:                  # for some reason the lock expired. Despite that, secret                  # loading or generation/storage must have been executed @@ -469,24 +487,20 @@ class Soledad(object):              create=True,              document_factory=SoledadDocument,              crypto=self._crypto, -            raw_key=True) +            raw_key=True, +            defer_encryption=self._defer_encryption)      def close(self):          """          Close underlying U1DB database.          """ +        logger.debug("Closing soledad")          if hasattr(self, '_db') and isinstance(                  self._db,                  SQLCipherDatabase): +            self._db.stop_sync()              self._db.close() -    def __del__(self): -        """ -        Make sure local database is closed when object is destroyed. -        """ -        # Watch out! We have no guarantees  that this is properly called. -        self.close() -      #      # Management of secret for symmetric encryption.      # @@ -520,6 +534,9 @@ class Soledad(object):          Define the id of the storage secret to be used.          This method will also replace the secret in the crypto object. + +        :param secret_id: The id of the storage secret to be used. +        :type secret_id: str          """          self._secret_id = secret_id @@ -881,7 +898,7 @@ class Soledad(object):          :type json: str          :param doc_id: An optional identifier specifying the document id.          :type doc_id: -        :return: The new cocument +        :return: The new document          :rtype: SoledadDocument          """          return self._db.create_doc_from_json(json, doc_id=doc_id) @@ -1041,7 +1058,7 @@ class Soledad(object):          if self._db:              return self._db.resolve_doc(doc, conflicted_doc_revs) -    def sync(self): +    def sync(self, defer_decryption=True):          """          Synchronize the local encrypted replica with a remote replica. @@ -1051,16 +1068,25 @@ class Soledad(object):          :param url: the url of the target replica to sync with          :type url: str -        :return: the local generation before the synchronisation was -            performed. +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :return: The local generation before the synchronisation was +                 performed.          :rtype: str          """          if self._db: +            try:                  local_gen = self._db.sync(                      urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), -                    creds=self._creds, autocreate=False) +                    creds=self._creds, autocreate=False, +                    defer_decryption=defer_decryption)                  signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)                  return local_gen +            except Exception as e: +                logger.error("Soledad exception when syncing: %s" % str(e))      def stop_sync(self):          """ @@ -1079,7 +1105,9 @@ class Soledad(object):          :return: Whether remote replica and local replica differ.          :rtype: bool          """ -        target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto) +        target = SoledadSyncTarget( +            url, self._db._get_replica_uid(), creds=self._creds, +            crypto=self._crypto)          info = target.get_sync_info(self._db._get_replica_uid())          # compare source generation with target's last known source generation          if self._db._get_generation() != info[4]: @@ -1087,6 +1115,13 @@ class Soledad(object):              return True          return False +    @property +    def syncing(self): +        """ +        Property, True if the syncer is syncing. +        """ +        return self._db.syncing +      def _set_token(self, token):          """          Set the authentication token for remote database access. diff --git a/client/src/leap/soledad/client/_version.py b/client/src/leap/soledad/client/_version.py index a3227cde..cf4e6706 100644 --- a/client/src/leap/soledad/client/_version.py +++ b/client/src/leap/soledad/client/_version.py @@ -5,8 +5,8 @@  # unpacked source archive. Distribution tarballs contain a pre-generated copy  # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c'  def get_versions(default={}, verbose=False): diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index a6372107..7133f804 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # crypto.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,27 +14,45 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Cryptographic utilities for Soledad.  """ - -  import os  import binascii  import hmac  import hashlib - +import json +import logging +import multiprocessing +import threading  from pycryptopp.cipher.aes import AES  from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument + + +from leap.soledad.common.crypto import ( +    EncryptionSchemes, +    UnknownEncryptionScheme, +    MacMethods, +    UnknownMacMethod, +    WrongMac, +    ENC_JSON_KEY, +    ENC_SCHEME_KEY, +    ENC_METHOD_KEY, +    ENC_IV_KEY, +    MAC_KEY, +    MAC_METHOD_KEY, +) +logger = logging.getLogger(__name__) -from leap.soledad.common import ( -    soledad_assert, -    soledad_assert_type, -) + +MAC_KEY_LENGTH = 64  class EncryptionMethods(object): @@ -45,6 +63,17 @@ class EncryptionMethods(object):      AES_256_CTR = 'aes-256-ctr'      XSALSA20 = 'xsalsa20' +# +# Exceptions +# + + +class DocumentNotEncrypted(Exception): +    """ +    Raised for failures in document encryption. +    """ +    pass +  class UnknownEncryptionMethod(Exception):      """ @@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):      """ -class SoledadCrypto(object): +def encrypt_sym(data, key, method):      """ -    General cryptographic functionality. +    Encrypt C{data} using a {password}. + +    Currently, the only encryption methods supported are AES-256 in CTR +    mode and XSalsa20. + +    :param data: The data to be encrypted. +    :type data: str +    :param key: The key used to encrypt C{data} (must be 256 bits long). +    :type key: str +    :param method: The encryption method to use. +    :type method: str + +    :return: A tuple with the initial value and the encrypted data. +    :rtype: (long, str) +    """ +    soledad_assert_type(key, str) + +    soledad_assert( +        len(key) == 32,  # 32 x 8 = 256 bits. +        'Wrong key size: %s bits (must be 256 bits long).' % +        (len(key) * 8)) +    iv = None +    # AES-256 in CTR mode +    if method == EncryptionMethods.AES_256_CTR: +        iv = os.urandom(16) +        ciphertext = AES(key=key, iv=iv).process(data) +    # XSalsa20 +    elif method == EncryptionMethods.XSALSA20: +        iv = os.urandom(24) +        ciphertext = XSalsa20(key=key, iv=iv).process(data) +    else: +        # raise if method is unknown +        raise UnknownEncryptionMethod('Unkwnown method: %s' % method) +    return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, method, **kwargs):      """ +    Decrypt data using symmetric secret. + +    Currently, the only encryption method supported is AES-256 CTR mode. -    MAC_KEY_LENGTH = 64 +    :param data: The data to be decrypted. +    :type data: str +    :param key: The key used to decrypt C{data} (must be 256 bits long). +    :type key: str +    :param method: The encryption method to use. +    :type method: str +    :param kwargs: Other parameters specific to each encryption method. +    :type kwargs: dict + +    :return: The decrypted data. +    :rtype: str +    """ +    soledad_assert_type(key, str) +    # assert params +    soledad_assert( +        len(key) == 32,  # 32 x 8 = 256 bits. +        'Wrong key size: %s (must be 256 bits long).' % len(key)) +    soledad_assert( +        'iv' in kwargs, +        '%s needs an initial value.' % method) +    # AES-256 in CTR mode +    if method == EncryptionMethods.AES_256_CTR: +        return AES( +            key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) +    elif method == EncryptionMethods.XSALSA20: +        return XSalsa20( +            key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + +    # raise if method is unknown +    raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + + +def doc_mac_key(doc_id, secret): +    """ +    Generate a key for calculating a MAC for a document whose id is +    C{doc_id}. +    The key is derived using HMAC having sha256 as underlying hash +    function. The key used for HMAC is the first MAC_KEY_LENGTH characters +    of Soledad's storage secret. The HMAC message is C{doc_id}. + +    :param doc_id: The id of the document. +    :type doc_id: str + +    :param secret: soledad secret storage +    :type secret: Soledad.storage_secret + +    :return: The key. +    :rtype: str + +    :raise NoSymmetricSecret: if no symmetric secret was supplied. +    """ +    if secret is None: +        raise NoSymmetricSecret() + +    return hmac.new( +        secret[:MAC_KEY_LENGTH], +        doc_id, +        hashlib.sha256).digest() + + +class SoledadCrypto(object): +    """ +    General cryptographic functionality encapsulated in a +    object that can be passed along. +    """      def __init__(self, soledad):          """          Initialize the crypto object. @@ -77,78 +209,14 @@ class SoledadCrypto(object):      def encrypt_sym(self, data, key,                      method=EncryptionMethods.AES_256_CTR): -        """ -        Encrypt C{data} using a {password}. - -        Currently, the only  encryption method supported is AES-256 CTR mode. - -        :param data: The data to be encrypted. -        :type data: str -        :param key: The key used to encrypt C{data} (must be 256 bits long). -        :type key: str -        :param method: The encryption method to use. -        :type method: str - -        :return: A tuple with the initial value and the encrypted data. -        :rtype: (long, str) -        """ -        soledad_assert_type(key, str) - -        soledad_assert( -            len(key) == 32,  # 32 x 8 = 256 bits. -            'Wrong key size: %s bits (must be 256 bits long).' % -            (len(key) * 8)) -        iv = None -        # AES-256 in CTR mode -        if method == EncryptionMethods.AES_256_CTR: -            iv = os.urandom(16) -            ciphertext = AES(key=key, iv=iv).process(data) -        # XSalsa20 -        elif method == EncryptionMethods.XSALSA20: -            iv = os.urandom(24) -            ciphertext = XSalsa20(key=key, iv=iv).process(data) -        else: -            # raise if method is unknown -            raise UnknownEncryptionMethod('Unkwnown method: %s' % method) -        return binascii.b2a_base64(iv), ciphertext +        return encrypt_sym(data, key, method)      def decrypt_sym(self, data, key,                      method=EncryptionMethods.AES_256_CTR, **kwargs): -        """ -        Decrypt data using symmetric secret. +        return decrypt_sym(data, key, method, **kwargs) -        Currently, the only encryption method supported is AES-256 CTR mode. - -        :param data: The data to be decrypted. -        :type data: str -        :param key: The key used to decrypt C{data} (must be 256 bits long). -        :type key: str -        :param method: The encryption method to use. -        :type method: str -        :param kwargs: Other parameters specific to each encryption method. -        :type kwargs: dict - -        :return: The decrypted data. -        :rtype: str -        """ -        soledad_assert_type(key, str) -        # assert params -        soledad_assert( -            len(key) == 32,  # 32 x 8 = 256 bits. -            'Wrong key size: %s (must be 256 bits long).' % len(key)) -        soledad_assert( -            'iv' in kwargs, -            '%s needs an initial value.' % method) -        # AES-256 in CTR mode -        if method == EncryptionMethods.AES_256_CTR: -            return AES( -                key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) -        elif method == EncryptionMethods.XSALSA20: -            return XSalsa20( -                key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - -        # raise if method is unknown -        raise UnknownEncryptionMethod('Unkwnown method: %s' % method) +    def doc_mac_key(self, doc_id, secret): +        return doc_mac_key(doc_id, self.secret)      def doc_passphrase(self, doc_id):          """ @@ -173,41 +241,769 @@ class SoledadCrypto(object):              raise NoSymmetricSecret()          return hmac.new(              self.secret[ -                self.MAC_KEY_LENGTH: +                MAC_KEY_LENGTH:                  self._soledad.REMOTE_STORAGE_SECRET_LENGTH],              doc_id,              hashlib.sha256).digest() -    def doc_mac_key(self, doc_id): +    # +    # secret setters/getters +    # + +    def _get_secret(self): +        return self._soledad.storage_secret + +    secret = property( +        _get_secret, doc='The secret used for symmetric encryption') + +# +# Crypto utilities for a SoledadDocument. +# + + +def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): +    """ +    Calculate a MAC for C{doc} using C{ciphertext}. + +    Current MAC method used is HMAC, with the following parameters: + +        * key: sha256(storage_secret, doc_id) +        * msg: doc_id + doc_rev + ciphertext +        * digestmod: sha256 + +    :param doc_id: The id of the document. +    :type doc_id: str +    :param doc_rev: The revision of the document. +    :type doc_rev: str +    :param ciphertext: The content of the document. +    :type ciphertext: str +    :param mac_method: The MAC method to use. +    :type mac_method: str +    :param secret: soledad secret +    :type secret: Soledad.secret_storage + +    :return: The calculated MAC. +    :rtype: str +    """ +    if mac_method == MacMethods.HMAC: +        return hmac.new( +            doc_mac_key(doc_id, secret), +            str(doc_id) + str(doc_rev) + ciphertext, +            hashlib.sha256).digest() +    # raise if we do not know how to handle this MAC method +    raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + +def encrypt_doc(crypto, doc): +    """ +    Wrapper around encrypt_docstr that accepts a crypto object and the document +    as arguments. + +    :param crypto: a soledad crypto object. +    :type crypto: SoledadCrypto +    :param doc: the document. +    :type doc: SoledadDocument +    """ +    key = crypto.doc_passphrase(doc.doc_id) +    secret = crypto.secret + +    return encrypt_docstr( +        doc.get_json(), doc.doc_id, doc.rev, key, secret) + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): +    """ +    Encrypt C{doc}'s content. + +    Encrypt doc's contents using AES-256 CTR mode and return a valid JSON +    string representing the following: + +        { +            ENC_JSON_KEY: '<encrypted doc JSON string>', +            ENC_SCHEME_KEY: 'symkey', +            ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, +            ENC_IV_KEY: '<the initial value used to encrypt>', +            MAC_KEY: '<mac>' +            MAC_METHOD_KEY: 'hmac' +        } + +    :param docstr: A representation of the document to be encrypted. +    :type docstr: str or unicode. + +    :param doc_id: The document id. +    :type doc_id: str + +    :param doc_rev: The document revision. +    :type doc_rev: str + +    :param key: The key used to encrypt ``data`` (must be 256 bits long). +    :type key: str + +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: The JSON serialization of the dict representing the encrypted +             content. +    :rtype: str +    """ +    # encrypt content using AES-256 CTR mode +    iv, ciphertext = encrypt_sym( +        str(docstr),  # encryption/decryption routines expect str +        key, method=EncryptionMethods.AES_256_CTR) +    # Return a representation for the encrypted content. In the following, we +    # convert binary data to hexadecimal representation so the JSON +    # serialization does not complain about what it tries to serialize. +    hex_ciphertext = binascii.b2a_hex(ciphertext) +    return json.dumps({ +        ENC_JSON_KEY: hex_ciphertext, +        ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, +        ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, +        ENC_IV_KEY: iv, +        MAC_KEY: binascii.b2a_hex(mac_doc(  # store the mac as hex. +            doc_id, doc_rev, ciphertext, +            MacMethods.HMAC, secret)), +        MAC_METHOD_KEY: MacMethods.HMAC, +    }) + + +def decrypt_doc(crypto, doc): +    """ +    Wrapper around decrypt_doc_dict that accepts a crypto object and the +    document as arguments. + +    :param crypto: a soledad crypto object. +    :type crypto: SoledadCrypto +    :param doc: the document. +    :type doc: SoledadDocument + +    :return: json string with the decrypted document +    :rtype: str +    """ +    key = crypto.doc_passphrase(doc.doc_id) +    secret = crypto.secret +    return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): +    """ +    Decrypt C{doc}'s content. + +    Return the JSON string representation of the document's decrypted content. + +    The passed doc_dict argument should have the following structure: + +        { +            ENC_JSON_KEY: '<enc_blob>', +            ENC_SCHEME_KEY: '<enc_scheme>', +            ENC_METHOD_KEY: '<enc_method>', +            ENC_IV_KEY: '<initial value used to encrypt>',  # (optional) +            MAC_KEY: '<mac>' +            MAC_METHOD_KEY: 'hmac' +        } + +    C{enc_blob} is the encryption of the JSON serialization of the document's +    content. For now Soledad just deals with documents whose C{enc_scheme} is +    EncryptionSchemes.SYMKEY and C{enc_method} is +    EncryptionMethods.AES_256_CTR. + +    :param doc_dict: The content of the document to be decrypted. +    :type doc_dict: dict + +    :param doc_id: The document id. +    :type doc_id: str + +    :param doc_rev: The document revision. +    :type doc_rev: str + +    :param key: The key used to encrypt ``data`` (must be 256 bits long). +    :type key: str + +    :param secret: +    :type secret: + +    :return: The JSON serialization of the decrypted content. +    :rtype: str +    """ +    soledad_assert(ENC_JSON_KEY in doc_dict) +    soledad_assert(ENC_SCHEME_KEY in doc_dict) +    soledad_assert(ENC_METHOD_KEY in doc_dict) +    soledad_assert(MAC_KEY in doc_dict) +    soledad_assert(MAC_METHOD_KEY in doc_dict) + +    # verify MAC +    ciphertext = binascii.a2b_hex(  # content is stored as hex. +        doc_dict[ENC_JSON_KEY]) +    mac = mac_doc( +        doc_id, doc_rev, +        ciphertext, +        doc_dict[MAC_METHOD_KEY], secret) +    # we compare mac's hashes to avoid possible timing attacks that might +    # exploit python's builtin comparison operator behaviour, which fails +    # immediatelly when non-matching bytes are found. +    doc_mac_hash = hashlib.sha256( +        binascii.a2b_hex(  # the mac is stored as hex +            doc_dict[MAC_KEY])).digest() +    calculated_mac_hash = hashlib.sha256(mac).digest() + +    if doc_mac_hash != calculated_mac_hash: +        logger.warning("Wrong MAC while decrypting doc...") +        raise WrongMac('Could not authenticate document\'s contents.') +    # decrypt doc's content +    enc_scheme = doc_dict[ENC_SCHEME_KEY] +    plainjson = None +    if enc_scheme == EncryptionSchemes.SYMKEY: +        enc_method = doc_dict[ENC_METHOD_KEY] +        if enc_method == EncryptionMethods.AES_256_CTR: +            soledad_assert(ENC_IV_KEY in doc_dict) +            plainjson = decrypt_sym( +                ciphertext, key, +                method=enc_method, +                iv=doc_dict[ENC_IV_KEY]) +        else: +            raise UnknownEncryptionMethod(enc_method) +    else: +        raise UnknownEncryptionScheme(enc_scheme) + +    return plainjson + + +def is_symmetrically_encrypted(doc): +    """ +    Return True if the document was symmetrically encrypted. + +    :param doc: The document to check. +    :type doc: SoledadDocument + +    :rtype: bool +    """ +    if doc.content and ENC_SCHEME_KEY in doc.content: +        if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: +            return True +    return False + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): +    """ +    Base class for encrypter/decrypter pools. +    """ +    WORKERS = 5 + +    def __init__(self, crypto, sync_db, write_lock): +        """ +        Initialize the pool of encryption-workers. + +        :param crypto: A SoledadCryto instance to perform the encryption. +        :type crypto: leap.soledad.crypto.SoledadCrypto + +        :param sync_db: a database connection handle +        :type sync_db: handle + +        :param write_lock: a write lock for controlling concurrent access +                           to the sync_db +        :type write_lock: threading.Lock +        """ +        self._pool = multiprocessing.Pool(self.WORKERS) +        self._crypto = crypto +        self._sync_db = sync_db +        self._sync_db_write_lock = write_lock + +    def close(self): +        """ +        Cleanly close the pool of workers. +        """ +        logger.debug("Closing %s" % (self.__class__.__name__,)) +        self._pool.close() +        try: +            self._pool.join() +        except Exception: +            pass + +    def terminate(self): +        """ +        Terminate the pool of workers. +        """ +        logger.debug("Terminating %s" % (self.__class__.__name__,)) +        self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): +    """ +    Encrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The serialized content of the document. +    :type content: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    encrypted_content = encrypt_docstr( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric encryption +    of documents to be synced. +    """ +    # TODO implement throttling to reduce cpu usage?? +    WORKERS = 5 +    TABLE_NAME = "docs_tosync" +    FIELD_NAMES = "doc_id, rev, content" + +    def encrypt_doc(self, doc, workers=True): +        """ +        Symmetrically encrypt a document. + +        :param doc: The document with contents to be encrypted. +        :type doc: SoledadDocument + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        soledad_assert(self._crypto is not None, "need a crypto object") +        docstr = doc.get_json() +        key = self._crypto.doc_passphrase(doc.doc_id) +        secret = self._crypto.secret +        args = doc.doc_id, doc.rev, docstr, key, secret + +        try: +            if workers: +                res = self._pool.apply_async( +                    encrypt_doc_task, args, +                    callback=self.encrypt_doc_cb) +            else: +                # encrypt inline +                res = encrypt_doc_task(*args) +                self.encrypt_doc_cb(res) + +        except Exception as exc: +            logger.exception(exc) + +    def encrypt_doc_cb(self, result):          """ -        Generate a key for calculating a MAC for a document whose id is -        C{doc_id}. +        Insert results of encryption routine into the local sync database. -        The key is derived using HMAC having sha256 as underlying hash -        function. The key used for HMAC is the first MAC_KEY_LENGTH characters -        of Soledad's storage secret. The HMAC message is C{doc_id}. +        :param result: A tuple containing the doc id, revision and encrypted +                       content. +        :type result: tuple(str, str, str) +        """ +        doc_id, doc_rev, content = result +        self.insert_encrypted_local_doc(doc_id, doc_rev, content) -        :param doc_id: The id of the document. +    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): +        """ +        Insert the contents of the encrypted doc into the local sync +        database. + +        :param doc_id: The document id.          :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param content: The encrypted document. +        :type content: str +        """ +        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) -        :return: The key. -        :rtype: str +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_del, (doc_id, )) +                con.execute(sql_ins, (doc_id, doc_rev, content)) -        :raise NoSymmetricSecret: if no symmetric secret was supplied. + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): +    """ +    Decrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The encrypted content of the document. +    :type content: str +    :param gen: The generation corresponding to the modification of that +                document. +    :type gen: int +    :param trans_id: The transaction id corresponding to the modification of +                     that document. +    :type trans_id: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    decrypted_content = decrypt_doc_dict( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): +    """ +    Return a list of documents ready to be inserted. This list is computed +    by aligning the expected list with the already gotten docs, and returning +    the maximum number of docs that can be processed in the expected order +    before finding a gap. + +    :param expected: A list of generations to be inserted. +    :type expected: list + +    :param got: A dictionary whose values are the docs to be inserted. +    :type got: dict +    """ +    ordered = [got.get(i) for i in expected] +    if None in ordered: +        return ordered[:ordered.index(None)] +    else: +        return ordered + + +class SyncDecrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric decryption +    of documents that were received. + +    The decryption of the received documents is done in two steps: + +        1. All the encrypted docs are collected, together with their generation +           and transaction-id +        2. The docs are enqueued for decryption. When completed, they are +           inserted following the generation order. +    """ +    # TODO implement throttling to reduce cpu usage?? +    TABLE_NAME = "docs_received" +    FIELD_NAMES = "doc_id, rev, content, gen, trans_id" + +    write_encrypted_lock = threading.Lock() + +    def __init__(self, *args, **kwargs):          """ -        if self.secret is None: -            raise NoSymmetricSecret() -        return hmac.new( -            self.secret[:self.MAC_KEY_LENGTH], -            doc_id, -            hashlib.sha256).digest() +        Initialize the decrypter pool, and setup a dict for putting the +        results of the decrypted docs until they are picked by the insert +        routine that gets them in order. +        """ +        self._insert_doc_cb = kwargs.pop("insert_doc_cb") +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) +        self.decrypted_docs = {} +        self.source_replica_uid = None -    # -    # secret setters/getters -    # +    def set_source_replica_uid(self, source_replica_uid): +        """ +        Set the source replica uid for this decrypter pool instance. -    def _get_secret(self): -        return self._soledad.storage_secret +        :param source_replica_uid: The uid of the source replica. +        :type source_replica_uid: str +        """ +        self.source_replica_uid = source_replica_uid -    secret = property( -        _get_secret, doc='The secret used for symmetric encryption') +    def insert_encrypted_received_doc(self, doc_id, doc_rev, content, +                                      gen, trans_id): +        """ +        Insert a received message with encrypted content, to be decrypted later +        on. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        """ +        docstr = json.dumps(content) +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( +            self.TABLE_NAME,) + +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + +    def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): +        """ +        Insert a marker with the document id, revision and generation on the +        sync db. This document does not have an encrypted payload, so the +        content has already been inserted into the decrypted_docs dictionary +        from where it can be picked following generation order. +        We need to leave here the marker to be able to calculate the expected +        insertion order for a synchronization batch. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param gen: the Document Generation +        :type gen: int +        """ +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( +            self.TABLE_NAME,) +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) + +    def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): +        """ +        Insert a document that is not symmetrically encrypted. +        We store it in the staging area (the decrypted_docs dictionary) to be +        picked up in order as the preceding documents are decrypted. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        """ +        # XXX this need a deeper review / testing. +        # I believe that what I'm doing here is prone to problems +        # if the sync is interrupted (ie, client crash) in the worst possible +        # moment. We would need a recover strategy in that case +        # (or, insert the document in the table all the same, but with a flag +        # saying if the document is sym-encrypted or not), +        content = json.dumps(content) +        result = doc_id, doc_rev, content, gen, trans_id +        self.decrypted_docs[gen] = result +        self.insert_marker_for_received_doc(doc_id, doc_rev, gen) + +    def delete_encrypted_received_doc(self, doc_id, doc_rev): +        """ +        Delete a encrypted received doc after it was inserted into the local +        db. + +        :param doc_id: Document ID. +        :type doc_id: str +        :param doc_rev: Document revision. +        :type doc_rev: str +        """ +        sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( +            self.TABLE_NAME,) +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_del, (doc_id, doc_rev)) + +    def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): +        """ +        Symmetrically decrypt a document. + +        :param doc_id: The ID for the document with contents to be encrypted. +        :type doc: str +        :param rev: The revision of the document. +        :type rev: str +        :param source_replica_uid: +        :type source_replica_uid: str + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        self.source_replica_uid = source_replica_uid + +        # insert_doc_cb is a proxy object that gets updated with the right +        # insert function only when the sync_target invokes the sync_exchange +        # method. so, if we don't still have a non-empty callback, we refuse +        # to proceed. +        if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), +                              None): +            logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") +            return + +        # XXX move to get_doc function... +        c = self._sync_db.cursor() +        sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( +            self.TABLE_NAME,) +        try: +            c.execute(sql, (doc_id, rev)) +            res = c.fetchone() +        except Exception as exc: +            logger.warning("Error getting docs from syncdb: %r" % (exc,)) +            return +        if res is None: +            logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) +            return + +        soledad_assert(self._crypto is not None, "need a crypto object") +        try: +            doc_id, rev, docstr, gen, trans_id = res +        except ValueError: +            logger.warning("Wrong entry in sync db") +            return + +        if len(docstr) == 0: +            # not encrypted payload +            return + +        try: +            content = json.loads(docstr) +        except TypeError: +            logger.warning("Wrong type while decoding json: %s" % repr(docstr)) +            return + +        key = self._crypto.doc_passphrase(doc_id) +        secret = self._crypto.secret +        args = doc_id, rev, content, gen, trans_id, key, secret + +        try: +            if workers: +                # Ouch. This is sent to the workers asynchronously, so +                # we have no way of logging errors. We'd have to inspect +                # lingering results by querying successful / get() over them... +                # Or move the heck out of it to twisted. +                res = self._pool.apply_async( +                    decrypt_doc_task, args, +                    callback=self.decrypt_doc_cb) +            else: +                # decrypt inline +                res = decrypt_doc_task(*args) +                self.decrypt_doc_cb(res) + +        except Exception as exc: +            logger.exception(exc) + +    def decrypt_doc_cb(self, result): +        """ +        Temporarily store the decryption result in a dictionary where it will +        be picked by process_decrypted. + +        :param result: A tuple containing the doc id, revision and encrypted +        content. +        :type result: tuple(str, str, str) +        """ +        doc_id, rev, content, gen, trans_id = result +        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) +        self.decrypted_docs[gen] = result + +    def get_docs_by_generation(self): +        """ +        Get all documents in the received table from the sync db, +        ordered by generation. + +        :return: list of doc_id, rev, generation +        """ +        c = self._sync_db.cursor() +        sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( +            self.TABLE_NAME,) +        c.execute(sql) +        return c.fetchall() + +    def count_received_encrypted_docs(self): +        """ +        Count how many documents we have in the table for received and +        encrypted docs. + +        :return: The count of documents. +        :rtype: int +        """ +        if self._sync_db is None: +            logger.warning("cannot return count with null sync_db") +            return +        c = self._sync_db.cursor() +        sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) +        c.execute(sql) +        res = c.fetchone() +        if res is not None: +            return res[0] +        else: +            return 0 + +    def decrypt_received_docs(self): +        """ +        Get all the encrypted documents from the sync database and dispatch a +        decrypt worker to decrypt each one of them. +        """ +        docs_by_generation = self.get_docs_by_generation() +        logger.debug("Sync decrypter pool: There are %d documents to " \ +                     "decrypt." % len(docs_by_generation)) +        for doc_id, rev, gen in filter(None, docs_by_generation): +            self.decrypt_doc(doc_id, rev, self.source_replica_uid) + +    def process_decrypted(self): +        """ +        Process the already decrypted documents, and insert as many documents +        as can be taken from the expected order without finding a gap. + +        :return: Whether we have processed all the pending docs. +        :rtype: bool +        """ +        # Acquire the lock to avoid processing while we're still +        # getting data from the syncing stream, to avoid InvalidGeneration +        # problems. +        with self.write_encrypted_lock: +            already_decrypted = self.decrypted_docs +            docs = self.get_docs_by_generation() +            docs = filter(lambda entry: len(entry) > 0, docs) +            expected = [gen for doc_id, rev, gen in docs] +            docs_to_insert = get_insertable_docs_by_gen( +                expected, already_decrypted) +            for doc_fields in docs_to_insert: +                self.insert_decrypted_local_doc(*doc_fields) +        remaining = self.count_received_encrypted_docs() +        return remaining == 0 + +    def insert_decrypted_local_doc(self, doc_id, doc_rev, content, +                                   gen, trans_id): +        """ +        Insert the decrypted document into the local sqlcipher database. +        Makes use of the passed callback `return_doc_cb` passed to the caller +        by u1db sync. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        """ +        # could pass source_replica in params for callback chain +        insert_fun = self._insert_doc_cb[self.source_replica_uid] +        logger.debug("Sync decrypter pool: inserting doc in local db: " \ +                     "%s:%s %s" % (doc_id, doc_rev, gen)) +        try: +            # convert deleted documents to avoid error on document creation +            if content == 'null': +                content = None +            doc = SoledadDocument(doc_id, doc_rev, content) +            insert_fun(doc, int(gen), trans_id) +        except Exception as exc: +            logger.error("Sync decrypter pool: error while inserting " +                         "decrypted doc into local db.") +            logger.exception(exc) + +        else: +            # If no errors found, remove it from the local temporary dict +            # and from the received database. +            self.decrypted_docs.pop(gen) +            self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..2df9606e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC  when opening a database.  So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases  handled by Soledad should be created by SQLCipher >= 2.0.  """  import logging +import multiprocessing  import os +import sqlite3  import string  import threading  import time @@ -57,9 +57,12 @@ from collections import defaultdict  from pysqlcipher import dbapi2  from u1db.backends import sqlite_backend  from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool  from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer  from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None  def open(path, password, create=True, document_factory=None, crypto=None,           raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, -         cipher_page_size=1024): +         cipher_page_size=1024, defer_encryption=False):      """Open a database at the given location.      Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None,      :type kdf_iter: int      :param cipher_page_size: The page size.      :type cipher_page_size: int +    :param defer_encryption: Whether to defer encryption/decryption of +                             documents, or do it inline while syncing. +    :type defer_encryption: bool      :return: An instance of Database.      :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None,      return SQLCipherDatabase.open_database(          path, password, create=create, document_factory=document_factory,          crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, -        cipher_page_size=cipher_page_size) +        cipher_page_size=cipher_page_size, defer_encryption=defer_encryption)  # @@ -147,19 +153,40 @@ class NotAnHexString(Exception):  #  class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): -    """A U1DB implementation that uses SQLCipher as its persistence layer.""" +    """ +    A U1DB implementation that uses SQLCipher as its persistence layer. +    """ +    defer_encryption = False      _index_storage_value = 'expand referenced encrypted'      k_lock = threading.Lock()      create_doc_lock = threading.Lock()      update_indexes_lock = threading.Lock() +    _sync_watcher = None +    _sync_enc_pool = None + +    """ +    The name of the local symmetrically encrypted documents to +    sync database file. +    """ +    LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' -    syncing_lock = defaultdict(threading.Lock)      """      A dictionary that hold locks which avoid multiple sync attempts from the      same database replica.      """ +    encrypting_lock = threading.Lock() +    """ +    Period or recurrence of the periodic encrypting task, in seconds. +    """ +    ENCRYPT_TASK_PERIOD = 1 + +    syncing_lock = defaultdict(threading.Lock) +    """ +    A dictionary that hold locks which avoid multiple sync attempts from the +    same database replica. +    """      def __init__(self, sqlcipher_file, password, document_factory=None,                   crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              self.assert_db_is_encrypted(                  sqlcipher_file, password, raw_key, cipher, kdf_iter,                  cipher_page_size) -        # connect to the database + +        # connect to the sqlcipher database          with self.k_lock:              self._db_handle = dbapi2.connect(                  sqlcipher_file, @@ -215,18 +243,45 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              self._ensure_schema()              self._crypto = crypto +        self._sync_db = None +        self._sync_db_write_lock = None +        self._sync_enc_pool = None + +        if self.defer_encryption: +            if sqlcipher_file != ":memory:": +                self._sync_db_path = "%s-sync" % sqlcipher_file +            else: +                self._sync_db_path = ":memory:" + +            # initialize sync db +            self._init_sync_db() + +            # initialize syncing queue encryption pool +            self._sync_enc_pool = SyncEncrypterPool( +                self._crypto, self._sync_db, self._sync_db_write_lock) +            self._sync_watcher = TimerTask(self._encrypt_syncing_docs, +                                           self.ENCRYPT_TASK_PERIOD) +            self._sync_watcher.start() +          def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,                      syncable=True):              return SoledadDocument(doc_id=doc_id, rev=rev, json=json,                                     has_conflicts=has_conflicts,                                     syncable=syncable)          self.set_document_factory(factory) +        # we store syncers in a dictionary indexed by the target URL. We also +        # store a hash of the auth info in case auth info expires and we need +        # to rebuild the syncer for that target. The final self._syncers +        # format is the following: +        # +        #     self._syncers = {'<url>': ('<auth_hash>', syncer), ...}          self._syncers = {}      @classmethod      def _open_database(cls, sqlcipher_file, password, document_factory=None,                         crypto=None, raw_key=False, cipher='aes-256-cbc', -                       kdf_iter=4000, cipher_page_size=1024): +                       kdf_iter=4000, cipher_page_size=1024, +                       defer_encryption=False):          """          Open a SQLCipher database. @@ -249,10 +304,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :type kdf_iter: int          :param cipher_page_size: The page size.          :type cipher_page_size: int +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool          :return: The database object.          :rtype: SQLCipherDatabase          """ +        cls.defer_encryption = defer_encryption          if not os.path.isfile(sqlcipher_file):              raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +357,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      def open_database(cls, sqlcipher_file, password, create, backend_cls=None,                        document_factory=None, crypto=None, raw_key=False,                        cipher='aes-256-cbc', kdf_iter=4000, -                      cipher_page_size=1024): +                      cipher_page_size=1024, defer_encryption=False):          """          Open a SQLCipher database.          :param sqlcipher_file: The path for the SQLCipher file.          :type sqlcipher_file: str +          :param password: The password that protects the SQLCipher db.          :type password: str +          :param create: Should the datbase be created if it does not already -            exist? -        :type: bool +                       exist? +        :type create: bool +          :param backend_cls: A class to use as backend.          :type backend_cls: type +          :param document_factory: A function that will be called with the same -            parameters as Document.__init__. +                                 parameters as Document.__init__.          :type document_factory: callable +          :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt -            document contents when syncing. +                       document contents when syncing.          :type crypto: soledad.crypto.SoledadCrypto +          :param raw_key: Whether C{password} is a raw 64-char hex string or a -            passphrase that should be hashed to obtain the encyrption key. +                        passphrase that should be hashed to obtain the +                        encyrption key.          :type raw_key: bool +          :param cipher: The cipher and mode to use.          :type cipher: str +          :param kdf_iter: The number of iterations to use.          :type kdf_iter: int +          :param cipher_page_size: The page size.          :type cipher_page_size: int +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool +          :return: The database object.          :rtype: SQLCipherDatabase          """ +        cls.defer_encryption = defer_encryption          try:              return cls._open_database(                  sqlcipher_file, password, document_factory=document_factory,                  crypto=crypto, raw_key=raw_key, cipher=cipher, -                kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) +                kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, +                defer_encryption=defer_encryption)          except u1db_errors.DatabaseDoesNotExist:              if not create:                  raise @@ -347,7 +422,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                  crypto=crypto, raw_key=raw_key, cipher=cipher,                  kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) -    def sync(self, url, creds=None, autocreate=True): +    def sync(self, url, creds=None, autocreate=True, defer_decryption=True):          """          Synchronize documents with remote replica exposed at url. @@ -362,6 +437,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :type creds: dict          :param autocreate: Ask the target to create the db if non-existent.          :type autocreate: bool +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool          :return: The local generation before the synchronisation was performed.          :rtype: int @@ -370,7 +449,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          # the following context manager blocks until the syncing lock can be          # acquired.          with self.syncer(url, creds=creds) as syncer: -            res = syncer.sync(autocreate=autocreate) + +            # XXX could mark the critical section here... +            try: +                res = syncer.sync(autocreate=autocreate, +                                  defer_decryption=defer_decryption) + +            except PendingReceivedDocsSyncError: +                logger.warning("Local sync db is not clear, skipping sync...") +                return +          return res      def stop_sync(self): @@ -394,7 +482,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:              syncer = self._get_syncer(url, creds=creds)              yield syncer -            syncer.sync_target.close() + +    @property +    def syncing(self): +        lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()] +        acquired_lock = lock.acquire(False) +        if acquired_lock is False: +            return True +        lock.release() +        return False      def _get_syncer(self, url, creds=None):          """ @@ -415,11 +511,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          h = sha256(json.dumps([url, creds])).hexdigest()          cur_h, syncer = self._syncers.get(url, (None, None))          if syncer is None or h != cur_h: -            syncer = Synchronizer( +            wlock = self._sync_db_write_lock +            syncer = SoledadSynchronizer(                  self,                  SoledadSyncTarget(url, +                                  self._replica_uid,                                    creds=creds, -                                  crypto=self._crypto)) +                                  crypto=self._crypto, +                                  sync_db=self._sync_db, +                                  sync_db_write_lock=wlock))              self._syncers[url] = (h, syncer)          # in order to reuse the same synchronizer multiple times we have to          # reset its state (i.e. the number of documents received from target @@ -442,21 +542,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              'ALTER TABLE document '              'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') -    def create_doc(self, content, doc_id=None): +    def _init_sync_db(self): +        """ +        Initialize the Symmetrically-Encrypted document to be synced database, +        and the queue to communicate with subprocess workers.          """ -        Create a new document in the local encrypted database. +        self._sync_db = sqlite3.connect(self._sync_db_path, +                                        check_same_thread=False) -        :param content: the contents of the new document -        :type content: dict -        :param doc_id: an optional identifier specifying the document id -        :type doc_id: str +        self._sync_db_write_lock = threading.Lock() +        self._create_sync_db_tables() +        self.sync_queue = multiprocessing.Queue() + +    def _create_sync_db_tables(self): +        """ +        Create tables for the local sync documents db if needed. +        """ +        encr = SyncEncrypterPool +        decr = SyncDecrypterPool +        sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( +            encr.TABLE_NAME, encr.FIELD_NAMES)) +        sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( +            decr.TABLE_NAME, decr.FIELD_NAMES)) + +        with self._sync_db_write_lock: +            with self._sync_db: +                self._sync_db.execute(sql_encr) +                self._sync_db.execute(sql_decr) + +    # +    # Symmetric encryption of syncing docs +    # + +    def _encrypt_syncing_docs(self): +        """ +        Process the syncing queue and send the documents there +        to be encrypted in the sync db. They will be read by the +        SoledadSyncTarget during the sync_exchange. + +        Called periodical from the TimerTask self._sync_watcher. +        """ +        lock = self.encrypting_lock +        # optional wait flag used to avoid blocking +        if not lock.acquire(False): +            return +        else: +            queue = self.sync_queue +            try: +                while not queue.empty(): +                    doc = queue.get_nowait() +                    self._sync_enc_pool.encrypt_doc(doc) + +            except Exception as exc: +                logger.error("Error while  encrypting docs to sync") +                logger.exception(exc) +            finally: +                lock.release() + +    # +    # Document operations +    # -        :return: the new document -        :rtype: SoledadDocument +    def put_doc(self, doc):          """ -        with self.create_doc_lock: -            return sqlite_backend.SQLitePartialExpandDatabase.create_doc( -                self, content, doc_id=doc_id) +        Overwrite the put_doc method, to enqueue the modified document for +        encryption before sync. + +        :param doc: The document to be put. +        :type doc: u1db.Document + +        :return: The new document revision. +        :rtype: str +        """ +        doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( +            self, doc) +        if self.defer_encryption: +            self.sync_queue.put_nowait(doc) +        return doc_rev + +    # indexes      def _put_and_update_indexes(self, old_doc, doc):          """ @@ -906,12 +1070,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          res = c.fetchall()          return res[0][0] -    def __del__(self): +    def close(self):          """ -        Closes db_handle upon object destruction. +        Close db_handle and close syncer.          """ +        logger.debug("Sqlcipher backend: closing") +        if self._sync_watcher is not None: +            self._sync_watcher.stop() +            self._sync_watcher.shutdown() +        for url in self._syncers: +            _, syncer = self._syncers[url] +            syncer.close() +        if self._sync_enc_pool is not None: +            self._sync_enc_pool.close()          if self._db_handle is not None:              self._db_handle.close() +    @property +    def replica_uid(self): +        return self._get_replica_uid() +  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..5d545a77 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,85 @@  """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + +    * Defer the update of the known replica uid until all the decryption of +      the incoming messages has been processed. + +    * Be interrupted and recovered.  """ +  import json +import logging +import traceback +from threading import Lock  from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer):      """      Collect the state around synchronizing 2 U1DB replicas. -    Modified to allow for interrupting the synchronization process. +    Synchronization is bi-directional, in that new items in the source are sent +    to the target, and new items in the target are returned to the source. +    However, it still recognizes that one side is initiating the request. Also, +    at the moment, conflicts are only created in the source. + +    Also modified to allow for interrupting the synchronization process.      """ +    syncing_lock = Lock() +      def stop(self):          """          Stop the current sync in progress.          """          self.sync_target.stop() -    def sync(self, autocreate=False): +    def sync(self, autocreate=False, defer_decryption=True):          """          Synchronize documents between source and target. +        Differently from u1db `Synchronizer.sync` method, this one allows to +        pass a `defer_decryption` flag that will postpone the last +        step in the synchronization dance, namely, the setting of the last +        known generation and transaction id for a given remote replica. + +        This is done to allow the ongoing parallel decryption of the incoming +        docs to proceed without `InvalidGeneration` conflicts. +          :param autocreate: Whether the target replica should be created or not.          :type autocreate: bool +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool +        """ +        self.syncing_lock.acquire() +        try: +            return self._sync(autocreate=autocreate, +                              defer_decryption=defer_decryption) +        except Exception: +            # re-raising the exceptions to let syqlcipher.sync catch them +            # (and re-create the syncer instance if needed) +            raise +        finally: +            self.release_syncing_lock() + +    def _sync(self, autocreate=False, defer_decryption=True): +        """ +        Helper function, called from the main `sync` method. +        See `sync` docstring.          """          sync_target = self.sync_target @@ -64,6 +114,16 @@ class Synchronizer(U1DBSynchronizer):              target_gen, target_trans_id = (0, '')              target_my_gen, target_my_trans_id = (0, '') +        logger.debug( +            "Soledad target sync info:\n" +            "  target replica uid: %s\n" +            "  target generation: %d\n" +            "  target trans id: %s\n" +            "  target my gen: %d\n" +            "  target my trans_id: %s" +            % (self.target_replica_uid, target_gen, target_trans_id, +            target_my_gen, target_my_trans_id)) +          # make sure we'll have access to target replica uid once it exists          if self.target_replica_uid is None: @@ -80,6 +140,8 @@ class Synchronizer(U1DBSynchronizer):          # what's changed since that generation and this current gen          my_gen, _, changes = self.source.whats_changed(target_my_gen) +        logger.debug("Soledad sync: there are %d documents to send." \ +                     % len(changes))          # get source last-seen database generation for the target          if self.target_replica_uid is None: @@ -88,6 +150,11 @@ class Synchronizer(U1DBSynchronizer):              target_last_known_gen, target_last_known_trans_id = \                  self.source._get_replica_gen_and_trans_id(                      self.target_replica_uid) +        logger.debug( +            "Soledad source sync info:\n" +            "  source target gen: %d\n" +            "  source target trans_id: %s" +            % (target_last_known_gen, target_last_known_trans_id))          # validate transaction ids          if not changes and target_last_known_gen == target_gen: @@ -111,15 +178,85 @@ class Synchronizer(U1DBSynchronizer):          #          # The sync_exchange method may be interrupted, in which case it will          # return a tuple of Nones. -        new_gen, new_trans_id = sync_target.sync_exchange( -            docs_by_generation, self.source._replica_uid, -            target_last_known_gen, target_last_known_trans_id, -            self._insert_doc_from_target, ensure_callback=ensure_callback) +        try: +            new_gen, new_trans_id = sync_target.sync_exchange( +                docs_by_generation, self.source._replica_uid, +                target_last_known_gen, target_last_known_trans_id, +                self._insert_doc_from_target, ensure_callback=ensure_callback, +                defer_decryption=defer_decryption) +            logger.debug( +                "Soledad source sync info after sync exchange:\n" +                "  source target gen: %d\n" +                "  source target trans_id: %s" +                % (new_gen, new_trans_id)) +            info = { +                "target_replica_uid": self.target_replica_uid, +                "new_gen": new_gen, +                "new_trans_id": new_trans_id, +                "my_gen": my_gen +            } +            self._syncing_info = info +            if defer_decryption and not sync_target.has_syncdb(): +                logger.debug("Sync target has no valid sync db, " +                             "aborting defer_decryption") +                defer_decryption = False +            self.complete_sync() +        except Exception as e: +            logger.error("Soledad sync error: %s" % str(e)) +            logger.error(traceback.format_exc()) +            sync_target.stop() +        finally: +            sync_target.close() -        # record target synced-up-to generation including applying what we sent +        return my_gen + +    def complete_sync(self): +        """ +        Last stage of the synchronization: +            (a) record last known generation and transaction uid for the remote +            replica, and +            (b) make target aware of our current reached generation. +        """ +        logger.debug("Completing deferred last step in SYNC...") + +        # record target synced-up-to generation including applying what we +        # sent +        info = self._syncing_info          self.source._set_replica_gen_and_trans_id( -            self.target_replica_uid, new_gen, new_trans_id) +            info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) +          # if gapless record current reached generation with target -        self._record_sync_info_with_the_target(my_gen) +        self._record_sync_info_with_the_target(info["my_gen"]) -        return my_gen +    @property +    def syncing(self): +        """ +        Return True if a sync is ongoing, False otherwise. +        :rtype: bool +        """ +        # XXX FIXME  we need some mechanism for timeout: should cleanup and +        # release if something in the syncdb-decrypt goes wrong. we could keep +        # track of the release date and cleanup unrealistic sync entries after +        # some time. +        locked = self.syncing_lock.locked() +        return locked + +    def release_syncing_lock(self): +        """ +        Release syncing lock if it's locked. +        """ +        if self.syncing_lock.locked(): +            self.syncing_lock.release() + +    def close(self): +        """ +        Close sync target pool of workers. +        """ +        self.release_syncing_lock() +        self.sync_target.close() + +    def __del__(self): +        """ +        Cleanup: release lock. +        """ +        self.release_syncing_lock() diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 968545b6..70e4d3a2 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,458 +14,466 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. + +  """  A U1DB backend for encrypting data before sending to server and decrypting  after receiving.  """ -import binascii + +  import cStringIO  import gzip -import hashlib -import hmac  import logging +import re  import urllib  import threading +import urlparse -import simplejson as json +from collections import defaultdict  from time import sleep  from uuid import uuid4 +from contextlib import contextmanager -from u1db.remote import utils, http_errors -from u1db.errors import BrokenSyncStream +import simplejson as json +from taskthread import TimerTask  from u1db import errors +from u1db.remote import utils, http_errors  from u1db.remote.http_target import HTTPSyncTarget -from u1db.remote.http_client import _encode_query_parameter - +from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject  from leap.soledad.common import soledad_assert -from leap.soledad.common.crypto import ( -    EncryptionSchemes, -    UnknownEncryptionScheme, -    MacMethods, -    UnknownMacMethod, -    WrongMac, -    ENC_JSON_KEY, -    ENC_SCHEME_KEY, -    ENC_METHOD_KEY, -    ENC_IV_KEY, -    MAC_KEY, -    MAC_METHOD_KEY, -)  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import ( -    EncryptionMethods, -    UnknownEncryptionMethod, -) -from leap.soledad.client.events import ( -    SOLEDAD_SYNC_SEND_STATUS, -    SOLEDAD_SYNC_RECEIVE_STATUS, -    signal, -) +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_doc, decrypt_doc +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import signal -logger = logging.getLogger(__name__) -# -# Exceptions -# +logger = logging.getLogger(__name__) -class DocumentNotEncrypted(Exception): +def _gunzip(data):      """ -    Raised for failures in document encryption. +    Uncompress data that is gzipped. + +    :param data: gzipped data +    :type data: basestring      """ -    pass +    buffer = cStringIO.StringIO() +    buffer.write(data) +    buffer.seek(0) +    try: +        data = gzip.GzipFile(mode='r', fileobj=buffer).read() +    except Exception: +        logger.warning("Error while decrypting gzipped data") +    buffer.close() +    return data -# -# Crypto utilities for a SoledadDocument. -# +class PendingReceivedDocsSyncError(Exception): +    pass -def mac_doc(crypto, doc_id, doc_rev, ciphertext, mac_method): +class DocumentSyncerThread(threading.Thread):      """ -    Calculate a MAC for C{doc} using C{ciphertext}. - -    Current MAC method used is HMAC, with the following parameters: - -        * key: sha256(storage_secret, doc_id) -        * msg: doc_id + doc_rev + ciphertext -        * digestmod: sha256 - -    :param crypto: A SoledadCryto instance used to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc_id: The id of the document. -    :type doc_id: str -    :param doc_rev: The revision of the document. -    :type doc_rev: str -    :param ciphertext: The content of the document. -    :type ciphertext: str -    :param mac_method: The MAC method to use. -    :type mac_method: str - -    :return: The calculated MAC. -    :rtype: str +    A thread that knowns how to either send or receive a document during the +    sync process.      """ -    if mac_method == MacMethods.HMAC: -        return hmac.new( -            crypto.doc_mac_key(doc_id), -            str(doc_id) + str(doc_rev) + ciphertext, -            hashlib.sha256).digest() -    # raise if we do not know how to handle this MAC method -    raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) +    def __init__(self, doc_syncer, release_method, failed_method, +            idx, total, last_request_lock=None, last_callback_lock=None): +        """ +        Initialize a new syncer thread. + +        :param doc_syncer: A document syncer. +        :type doc_syncer: HTTPDocumentSyncer +        :param release_method: A method to be called when finished running. +        :type release_method: callable(DocumentSyncerThread) +        :param failed_method: A method to be called when we failed. +        :type failed_method: callable(DocumentSyncerThread) +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param last_request_lock: A lock to wait for before actually performing +                                  the request. +        :type last_request_lock: threading.Lock +        :param last_callback_lock: A lock to wait for before actually running +                                  the success callback. +        :type last_callback_lock: threading.Lock +        """ +        threading.Thread.__init__(self) +        self._doc_syncer = doc_syncer +        self._release_method = release_method +        self._failed_method = failed_method +        self._idx = idx +        self._total = total +        self._last_request_lock = last_request_lock +        self._last_callback_lock = last_callback_lock +        self._response = None +        self._exception = None +        self._result = None +        self._success = False +        # a lock so we can signal when we're finished +        self._request_lock = threading.Lock() +        self._request_lock.acquire() +        self._callback_lock = threading.Lock() +        self._callback_lock.acquire() +        # make thread interruptable +        self._stopped = None +        self._stop_lock = threading.Lock() -def encrypt_doc(crypto, doc): -    """ -    Encrypt C{doc}'s content. - -    Encrypt doc's contents using AES-256 CTR mode and return a valid JSON -    string representing the following: - -        { -            ENC_JSON_KEY: '<encrypted doc JSON string>', -            ENC_SCHEME_KEY: 'symkey', -            ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, -            ENC_IV_KEY: '<the initial value used to encrypt>', -            MAC_KEY: '<mac>' -            MAC_METHOD_KEY: 'hmac' -        } - -    :param crypto: A SoledadCryto instance used to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc: The document with contents to be encrypted. -    :type doc: SoledadDocument - -    :return: The JSON serialization of the dict representing the encrypted -        content. -    :rtype: str -    """ -    soledad_assert(doc.is_tombstone() is False) -    # encrypt content using AES-256 CTR mode -    iv, ciphertext = crypto.encrypt_sym( -        str(doc.get_json()),  # encryption/decryption routines expect str -        crypto.doc_passphrase(doc.doc_id), -        method=EncryptionMethods.AES_256_CTR) -    # Return a representation for the encrypted content. In the following, we -    # convert binary data to hexadecimal representation so the JSON -    # serialization does not complain about what it tries to serialize. -    hex_ciphertext = binascii.b2a_hex(ciphertext) -    return json.dumps({ -        ENC_JSON_KEY: hex_ciphertext, -        ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, -        ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, -        ENC_IV_KEY: iv, -        # store the mac as hex. -        MAC_KEY: binascii.b2a_hex( -            mac_doc( -                crypto, doc.doc_id, doc.rev, -                ciphertext, -                MacMethods.HMAC)), -        MAC_METHOD_KEY: MacMethods.HMAC, -    }) - - -def decrypt_doc(crypto, doc): -    """ -    Decrypt C{doc}'s content. +    def run(self): +        """ +        Run the HTTP request and store results. -    Return the JSON string representation of the document's decrypted content. +        This method will block and wait for an eventual previous operation to +        finish before actually performing the request. It also traps any +        exception and register any failure with the request. +        """ +        with self._stop_lock: +            if self._stopped is None: +                self._stopped = False +            else: +                return + +        # eventually wait for the previous thread to finish +        if self._last_request_lock is not None: +            self._last_request_lock.acquire() + +        # bail out in case we've been interrupted +        if self.stopped is True: +            return + +        try: +            self._response = self._doc_syncer.do_request() +            self._request_lock.release() + +            # run success callback +            if self._doc_syncer.success_callback is not None: + +                # eventually wait for callback lock release +                if self._last_callback_lock is not None: +                    self._last_callback_lock.acquire() + +                # bail out in case we've been interrupted +                if self._stopped is True: +                    return + +                self._result = self._doc_syncer.success_callback( +                    self._idx, self._total, self._response) +                self._success = True +                doc_syncer = self._doc_syncer +                self._release_method(self, doc_syncer) +                self._doc_syncer = None +                # let next thread executed its callback +                self._callback_lock.release() + +        # trap any exception and signal failure +        except Exception as e: +            self._exception = e +            self._success = False +            # run failure callback +            if self._doc_syncer.failure_callback is not None: + +                # eventually wait for callback lock release +                if self._last_callback_lock is not None: +                    self._last_callback_lock.acquire() + +                # bail out in case we've been interrupted +                if self.stopped is True: +                    return + +                self._doc_syncer.failure_callback( +                    self._idx, self._total, self._exception) + +                self._failed_method(self) +                # we do not release the callback lock here because we +                # failed and so we don't want other threads to succeed. -    The content of the document should have the following structure: +    @property +    def doc_syncer(self): +        return self._doc_syncer -        { -            ENC_JSON_KEY: '<enc_blob>', -            ENC_SCHEME_KEY: '<enc_scheme>', -            ENC_METHOD_KEY: '<enc_method>', -            ENC_IV_KEY: '<initial value used to encrypt>',  # (optional) -            MAC_KEY: '<mac>' -            MAC_METHOD_KEY: 'hmac' -        } +    @property +    def response(self): +        return self._response -    C{enc_blob} is the encryption of the JSON serialization of the document's -    content. For now Soledad just deals with documents whose C{enc_scheme} is -    EncryptionSchemes.SYMKEY and C{enc_method} is -    EncryptionMethods.AES_256_CTR. +    @property +    def exception(self): +        return self._exception -    :param crypto: A SoledadCryto instance to perform the encryption. -    :type crypto: leap.soledad.crypto.SoledadCrypto -    :param doc: The document to be decrypted. -    :type doc: SoledadDocument +    @property +    def callback_lock(self): +        return self._callback_lock -    :return: The JSON serialization of the decrypted content. -    :rtype: str -    """ -    soledad_assert(doc.is_tombstone() is False) -    soledad_assert(ENC_JSON_KEY in doc.content) -    soledad_assert(ENC_SCHEME_KEY in doc.content) -    soledad_assert(ENC_METHOD_KEY in doc.content) -    soledad_assert(MAC_KEY in doc.content) -    soledad_assert(MAC_METHOD_KEY in doc.content) -    # verify MAC -    ciphertext = binascii.a2b_hex(  # content is stored as hex. -        doc.content[ENC_JSON_KEY]) -    mac = mac_doc( -        crypto, doc.doc_id, doc.rev, -        ciphertext, -        doc.content[MAC_METHOD_KEY]) -    # we compare mac's hashes to avoid possible timing attacks that might -    # exploit python's builtin comparison operator behaviour, which fails -    # immediatelly when non-matching bytes are found. -    doc_mac_hash = hashlib.sha256( -        binascii.a2b_hex(  # the mac is stored as hex -            doc.content[MAC_KEY])).digest() -    calculated_mac_hash = hashlib.sha256(mac).digest() -    if doc_mac_hash != calculated_mac_hash: -        raise WrongMac('Could not authenticate document\'s contents.') -    # decrypt doc's content -    enc_scheme = doc.content[ENC_SCHEME_KEY] -    plainjson = None -    if enc_scheme == EncryptionSchemes.SYMKEY: -        enc_method = doc.content[ENC_METHOD_KEY] -        if enc_method == EncryptionMethods.AES_256_CTR: -            soledad_assert(ENC_IV_KEY in doc.content) -            plainjson = crypto.decrypt_sym( -                ciphertext, -                crypto.doc_passphrase(doc.doc_id), -                method=enc_method, -                iv=doc.content[ENC_IV_KEY]) -        else: -            raise UnknownEncryptionMethod(enc_method) -    else: -        raise UnknownEncryptionScheme(enc_scheme) -    return plainjson +    @property +    def request_lock(self): +        return self._request_lock +    @property +    def success(self): +        return self._success -def _gunzip(data): -    """ -    Uncompress data that is gzipped. +    def stop(self): +        with self._stop_lock: +            self._stopped = True -    :param data: gzipped data -    :type data: basestring -    """ -    buffer = cStringIO.StringIO() -    buffer.write(data) -    buffer.seek(0) -    try: -        data = gzip.GzipFile(mode='r', fileobj=buffer).read() -    except Exception: -        logger.warning("Error while decrypting gzipped data") -    buffer.close() -    return data +    @property +    def stopped(self): +        with self._stop_lock: +            return self._stopped +    @property +    def result(self): +        return self._result -# -# SoledadSyncTarget -# -class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +class DocumentSyncerPool(object):      """ -    A SyncTarget that encrypts data before sending and decrypts data after -    receiving. +    A pool of reusable document syncers.      """ -    # -    # Token auth methods. -    # +    POOL_SIZE = 10 +    """ +    The maximum amount of syncer threads running at the same time. +    """ -    def set_token_credentials(self, uuid, token): +    def __init__(self, raw_url, raw_creds, query_string, headers, +            ensure_callback, stop_method):          """ -        Store given credentials so we can sign the request later. +        Initialize the document syncer pool. + +        :param raw_url: The complete raw URL for the HTTP request. +        :type raw_url: str +        :param raw_creds: The credentials for the HTTP request. +        :type raw_creds: dict +        :param query_string: The query string for the HTTP request. +        :type query_string: str +        :param headers: The headers for the HTTP request. +        :type headers: dict +        :param ensure_callback: A callback to ensure we have the correct +                                target_replica_uid, if it was just created. +        :type ensure_callback: callable -        :param uuid: The user's uuid. -        :type uuid: str -        :param token: The authentication token. -        :type token: str          """ -        TokenBasedAuth.set_token_credentials(self, uuid, token) - -    def _sign_request(self, method, url_query, params): +        # save syncer params +        self._raw_url = raw_url +        self._raw_creds = raw_creds +        self._query_string = query_string +        self._headers = headers +        self._ensure_callback = ensure_callback +        self._stop_method = stop_method +        # pool attributes +        self._failures = False +        self._semaphore_pool = threading.BoundedSemaphore( +            DocumentSyncerPool.POOL_SIZE) +        self._pool_access_lock = threading.Lock() +        self._doc_syncers = [] +        self._threads = [] + +    def new_syncer_thread(self, idx, total, last_request_lock=None, +            last_callback_lock=None):          """ -        Return an authorization header to be included in the HTTP request. +        Yield a new document syncer thread. + +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param last_request_lock: A lock to wait for before actually performing +                                  the request. +        :type last_request_lock: threading.Lock +        :param last_callback_lock: A lock to wait for before actually running +                                   the success callback. +        :type last_callback_lock: threading.Lock +        """ +        t = None +        # wait for available threads +        self._semaphore_pool.acquire() +        with self._pool_access_lock: +            if self._failures is True: +                return None +            # get a syncer +            doc_syncer = self._get_syncer() +            # we rely on DocumentSyncerThread.run() to release the lock using +            # self.release_syncer so we can launch a new thread. +            t = DocumentSyncerThread( +                doc_syncer, self.release_syncer, self.cancel_threads, +                idx, total, +                last_request_lock=last_request_lock, +                last_callback_lock=last_callback_lock) +            self._threads.append(t) +            return t + +    def _failed(self): +        with self._pool_access_lock: +            self._failures = True -        :param method: The HTTP method. -        :type method: str -        :param url_query: The URL query string. -        :type url_query: str -        :param params: A list with encoded query parameters. -        :type param: list +    @property +    def failures(self): +        return self._failures -        :return: The Authorization header. -        :rtype: list of tuple +    def _get_syncer(self):          """ -        return TokenBasedAuth._sign_request(self, method, url_query, params) - -    # -    # Modified HTTPSyncTarget methods. -    # +        Get a document syncer from the pool. -    @staticmethod -    def connect(url, crypto=None): -        return SoledadSyncTarget(url, crypto=crypto) +        This method will create a new syncer whenever there is no syncer +        available in the pool. -    def __init__(self, url, creds=None, crypto=None): +        :return: A syncer. +        :rtype: HTTPDocumentSyncer          """ -        Initialize the SoledadSyncTarget. - -        :param url: The url of the target replica to sync with. -        :type url: str -        :param creds: optional dictionary giving credentials. -            to authorize the operation with the server. -        :type creds: dict -        :param soledad: An instance of Soledad so we can encrypt/decrypt -            document contents when syncing. -        :type soledad: soledad.Soledad +        syncer = None +        # get an available syncer or create a new one +        try: +            syncer = self._doc_syncers.pop() +        except IndexError: +            syncer = HTTPDocumentSyncer( +                self._raw_url, self._raw_creds, self._query_string, +                self._headers, self._ensure_callback) +        return syncer + +    def release_syncer(self, syncer_thread, doc_syncer):          """ -        HTTPSyncTarget.__init__(self, url, creds) -        self._crypto = crypto -        self._stopped = True -        self._stop_lock = threading.Lock() +        Return a syncer to the pool after use and check for any failures. -    def _init_post_request(self, url, action, headers, content_length): +        :param syncer: The syncer to be returned to the pool. +        :type syncer: HTTPDocumentSyncer          """ -        Initiate a syncing POST request. +        with self._pool_access_lock: +            self._doc_syncers.append(doc_syncer) +            if syncer_thread.success is True: +                self._threads.remove(syncer_thread) +            self._semaphore_pool.release() -        :param url: The syncing URL. -        :type url: str -        :param action: The syncing action, either 'get' or 'receive'. -        :type action: str -        :param headers: The initial headers to be sent on this request. -        :type headers: dict -        :param content_length: The content-length of the request. -        :type content_length: int +    def cancel_threads(self, calling_thread):          """ -        self._conn.putrequest('POST', url) -        self._conn.putheader( -            'content-type', 'application/x-soledad-sync-%s' % action) -        for header_name, header_value in headers: -            self._conn.putheader(header_name, header_value) -        self._conn.putheader('accept-encoding', 'gzip') -        self._conn.putheader('content-length', str(content_length)) -        self._conn.endheaders() - -    def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, -                         headers, return_doc_cb, ensure_callback, sync_id): +        Stop all threads in the pool.          """ -        Fetch sync documents from the remote database and insert them in the -        local database. +        # stop sync +        self._stop_method() +        stopped = [] +        # stop all threads +        logger.warning("Soledad sync: cancelling sync threads...") +        with self._pool_access_lock: +            self._failures = True +            while self._threads: +                t = self._threads.pop(0) +                t.stop() +                self._doc_syncers.append(t.doc_syncer) +                stopped.append(t) +        # release locks and join +        while stopped: +            t = stopped.pop(0) +            t.request_lock.acquire(False)   # just in case +            t.request_lock.release() +            t.callback_lock.acquire(False)  # just in case +            t.callback_lock.release() +        logger.warning("Soledad sync: cancelled sync threads.") + +    def cleanup(self): +        """ +        Close and remove any syncers from the pool. +        """ +        with self._pool_access_lock: +            while self._doc_syncers: +                syncer = self._doc_syncers.pop() +                syncer.close() +                del syncer -        If an incoming document's encryption scheme is equal to -        EncryptionSchemes.SYMKEY, then this method will decrypt it with -        Soledad's symmetric key. -        :param url: The syncing URL. -        :type url: str -        :param last_known_generation: Target's last known generation. -        :type last_known_generation: int -        :param last_known_trans_id: Target's last known transaction id. -        :type last_known_trans_id: str -        :param headers: The headers of the HTTP request. +class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth): + +    def __init__(self, raw_url, creds, query_string, headers, ensure_callback): +        """ +        Initialize the client. + +        :param raw_url: The raw URL of the target HTTP server. +        :type raw_url: str +        :param creds: Authentication credentials. +        :type creds: dict +        :param query_string: The query string for the HTTP request. +        :type query_string: str +        :param headers: The headers for the HTTP request.          :type headers: dict -        :param return_doc_cb: A callback to insert docs from target. -        :type return_doc_cb: callable          :param ensure_callback: A callback to ensure we have the correct                                  target_replica_uid, if it was just created.          :type ensure_callback: callable +        """ +        HTTPClientBase.__init__(self, raw_url, creds=creds) +        # info needed to perform the request +        self._query_string = query_string +        self._headers = headers +        self._ensure_callback = ensure_callback +        # the actual request method +        self._request_method = None +        self._success_callback = None +        self._failure_callback = None + +    def _reset(self): +        """ +        Reset this document syncer so we can reuse it. +        """ +        self._request_method = None +        self._success_callback = None +        self._failure_callback = None +        self._request_method = None -        :raise BrokenSyncStream: If C{data} is malformed. +    def set_request_method(self, method, *args, **kwargs): +        """ +        Set the actual method to perform the request. -        :return: A dictionary representing the first line of the response got -            from remote replica. -        :rtype: list of str -        """ - -        def _post_get_doc(received): -            """ -            Get a sync document from server by means of a POST request. - -            :param received: The number of documents already received in the -                             current sync session. -            :type received: int -            """ -            entries = ['['] -            size = 1 -            # add remote replica metadata to the request -            size += self._prepare( -                '', entries, -                last_known_generation=last_known_generation, -                last_known_trans_id=last_known_trans_id, -                sync_id=sync_id, -                ensure=ensure_callback is not None) -            # inform server of how many documents have already been received -            size += self._prepare( -                ',', entries, received=received) -            entries.append('\r\n]') -            size += len(entries[-1]) -            # send headers -            self._init_post_request(url, 'get', headers, size) -            # get document -            for entry in entries: -                self._conn.send(entry) -            return self._response() - -        number_of_changes = None -        received = 0 +        :param method: Either 'get' or 'put'. +        :type method: str +        :param args: Arguments for the request method. +        :type args: list +        :param kwargs: Keyworded arguments for the request method. +        :type kwargs: dict +        """ +        self._reset() +        # resolve request method +        if method is 'get': +            self._request_method = self._get_doc +        elif method is 'put': +            self._request_method = self._put_doc +        else: +            raise Exception +        # store request method args +        self._args = args +        self._kwargs = kwargs -        new_generation = last_known_generation -        new_transaction_id = last_known_trans_id -        while number_of_changes is None or received < number_of_changes: -            # bail out if sync process was interrupted -            if self.stopped is True: -                return last_known_generation, last_known_trans_id -            # try to fetch one document from target -            data, _ = _post_get_doc(received) -            # decode incoming stream -            parts = data.splitlines() -            if not parts or parts[0] != '[' or parts[-1] != ']': -                raise BrokenSyncStream -            data = parts[1:-1] -            # decode metadata -            line, comma = utils.check_and_strip_comma(data[0]) -            metadata = None -            try: -                metadata = json.loads(line) -                soledad_assert('number_of_changes' in metadata) -                soledad_assert('new_generation' in metadata) -                soledad_assert('new_transaction_id' in metadata) -                number_of_changes = metadata['number_of_changes'] -                new_generation = metadata['new_generation'] -                new_transaction_id = metadata['new_transaction_id'] -            except json.JSONDecodeError, AssertionError: -                raise BrokenSyncStream -            # make sure we have replica_uid from fresh new dbs -            if ensure_callback and 'replica_uid' in metadata: -                ensure_callback(metadata['replica_uid']) -            # bail out if there are no documents to be received -            if number_of_changes == 0: -                break -            # decrypt incoming document and insert into local database -            entry = None -            try: -                entry = json.loads(data[1]) -            except IndexError: -                raise BrokenSyncStream -            # ------------------------------------------------------------- -            # symmetric decryption of document's contents -            # ------------------------------------------------------------- -            # if arriving content was symmetrically encrypted, we decrypt -            # it. -            doc = SoledadDocument( -                entry['id'], entry['rev'], entry['content']) -            if doc.content and ENC_SCHEME_KEY in doc.content: -                if doc.content[ENC_SCHEME_KEY] == \ -                        EncryptionSchemes.SYMKEY: -                    doc.set_json(decrypt_doc(self._crypto, doc)) -            # ------------------------------------------------------------- -            # end of symmetric decryption -            # ------------------------------------------------------------- -            return_doc_cb(doc, entry['gen'], entry['trans_id']) -            received += 1 -            signal( -                SOLEDAD_SYNC_RECEIVE_STATUS, -                "%d/%d" % -                (received, number_of_changes)) -        return new_generation, new_transaction_id +    def set_success_callback(self, callback): +        self._success_callback = callback + +    def set_failure_callback(self, callback): +        self._failure_callback = callback + +    @property +    def success_callback(self): +        return self._success_callback + +    @property +    def failure_callback(self): +        return self._failure_callback + +    def do_request(self): +        """ +        Actually perform the request. + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        self._ensure_connection() +        args = self._args +        kwargs = self._kwargs +        return self._request_method(*args, **kwargs)      def _request(self, method, url_parts, params=None, body=None,                   content_type=None): @@ -482,6 +490,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type body: str          :param content-type: The content-type of the request.          :type content-type: str + +        :return: The body and headers of the response. +        :rtype: tuple + +        :raise errors.Unavailable: Raised after a number of unsuccesful +                                   request attempts. +        :raise Exception: Raised for any other exception ocurring during the +                          request.          """          self._ensure_connection() @@ -566,14 +582,506 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type entries: list          :param dic: The data to be included in this entry.          :type dic: dict + +        :return: The size of the prepared entry. +        :rtype: int          """          entry = comma + '\r\n' + json.dumps(dic)          entries.append(entry)          return len(entry) -    def sync_exchange(self, docs_by_generations, source_replica_uid, -                      last_known_generation, last_known_trans_id, -                      return_doc_cb, ensure_callback=None): +    def _init_post_request(self, action, content_length): +        """ +        Initiate a syncing POST request. + +        :param url: The syncing URL. +        :type url: str +        :param action: The syncing action, either 'get' or 'receive'. +        :type action: str +        :param headers: The initial headers to be sent on this request. +        :type headers: dict +        :param content_length: The content-length of the request. +        :type content_length: int +        """ +        self._conn.putrequest('POST', self._query_string) +        self._conn.putheader( +            'content-type', 'application/x-soledad-sync-%s' % action) +        for header_name, header_value in self._headers: +            self._conn.putheader(header_name, header_value) +        self._conn.putheader('accept-encoding', 'gzip') +        self._conn.putheader('content-length', str(content_length)) +        self._conn.endheaders() + +    def _get_doc(self, received, sync_id, last_known_generation, +            last_known_trans_id): +        """ +        Get a sync document from server by means of a POST request. + +        :param received: The number of documents already received in the +                         current sync session. +        :type received: int +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        entries = ['['] +        size = 1 +        # add remote replica metadata to the request +        size += self._prepare( +            '', entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # inform server of how many documents have already been received +        size += self._prepare( +            ',', entries, received=received) +        entries.append('\r\n]') +        size += len(entries[-1]) +        # send headers +        self._init_post_request('get', size) +        # get document +        for entry in entries: +            self._conn.send(entry) +        return self._response() + +    def _put_doc(self, sync_id, last_known_generation, last_known_trans_id, +            id, rev, content, gen, trans_id, number_of_docs, doc_idx): +        """ +        Put a sync document on server by means of a POST request. + +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str +        :param id: The document id. +        :type id: str +        :param rev: The document revision. +        :type rev: str +        :param content: The serialized document content. +        :type content: str +        :param gen: The generation of the modification of the document. +        :type gen: int +        :param trans_id: The transaction id of the modification of the +                         document. +        :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int + +        :return: The body and headers of the response. +        :rtype: tuple +        """ +        # prepare to send the document +        entries = ['['] +        size = 1 +        # add remote replica metadata to the request +        size += self._prepare( +            '', entries, +            last_known_generation=last_known_generation, +            last_known_trans_id=last_known_trans_id, +            sync_id=sync_id, +            ensure=self._ensure_callback is not None) +        # add the document to the request +        size += self._prepare( +            ',', entries, +            id=id, rev=rev, content=content, gen=gen, trans_id=trans_id, +            number_of_docs=number_of_docs, doc_idx=doc_idx) +        entries.append('\r\n]') +        size += len(entries[-1]) +        # send headers +        self._init_post_request('put', size) +        # send document +        for entry in entries: +            self._conn.send(entry) +        return self._response() + +    def _sign_request(self, method, url_query, params): +        """ +        Return an authorization header to be included in the HTTP request. + +        :param method: The HTTP method. +        :type method: str +        :param url_query: The URL query string. +        :type url_query: str +        :param params: A list with encoded query parameters. +        :type param: list + +        :return: The Authorization header. +        :rtype: list of tuple +        """ +        return TokenBasedAuth._sign_request(self, method, url_query, params) + +    def set_token_credentials(self, uuid, token): +        """ +        Store given credentials so we can sign the request later. + +        :param uuid: The user's uuid. +        :type uuid: str +        :param token: The authentication token. +        :type token: str +        """ +        TokenBasedAuth.set_token_credentials(self, uuid, token) + + +class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): +    """ +    A SyncTarget that encrypts data before sending and decrypts data after +    receiving. + +    Normally encryption will have been written to the sync database upon +    document modification. The sync database is also used to write temporarily +    the parsed documents that the remote send us, before being decrypted and +    written to the main database. +    """ + +    # will later keep a reference to the insert-doc callback +    # passed to sync_exchange +    _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + +    """ +    Period of recurrence of the periodic decrypting task, in seconds. +    """ +    DECRYPT_TASK_PERIOD = 0.5 + +    # +    # Modified HTTPSyncTarget methods. +    # + +    def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, +            sync_db=None, sync_db_write_lock=None): +        """ +        Initialize the SoledadSyncTarget. + +        :param source_replica_uid: The source replica uid which we use when +                                   deferring decryption. +        :type source_replica_uid: str +        :param url: The url of the target replica to sync with. +        :type url: str +        :param creds: Optional dictionary giving credentials. +                      to authorize the operation with the server. +        :type creds: dict +        :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt +                        document contents when syncing. +        :type crypto: soledad.crypto.SoledadCrypto +        :param sync_db: Optional. handler for the db with the symmetric +                        encryption of the syncing documents. If +                        None, encryption will be done in-place, +                        instead of retreiving it from the dedicated +                        database. +        :type sync_db: Sqlite handler +        :param sync_db_write_lock: a write lock for controlling concurrent +                                   access to the sync_db +        :type sync_db_write_lock: threading.Lock +        """ +        HTTPSyncTarget.__init__(self, url, creds) +        self._raw_url = url +        self._raw_creds = creds +        self._crypto = crypto +        self._stopped = True +        self._stop_lock = threading.Lock() +        self._sync_exchange_lock = threading.Lock() +        self.source_replica_uid = source_replica_uid +        self._defer_decryption = False + +        # deferred decryption attributes +        self._sync_db = None +        self._sync_db_write_lock = None +        self._decryption_callback = None +        self._sync_decr_pool = None +        self._sync_watcher = None +        if sync_db and sync_db_write_lock is not None: +            self._sync_db = sync_db +            self._sync_db_write_lock = sync_db_write_lock + +    def _setup_sync_decr_pool(self): +        """ +        Set up the SyncDecrypterPool for deferred decryption. +        """ +        if self._sync_decr_pool is None: +            # initialize syncing queue decryption pool +            self._sync_decr_pool = SyncDecrypterPool( +                self._crypto, self._sync_db, +                self._sync_db_write_lock, +                insert_doc_cb=self._insert_doc_cb) +            self._sync_decr_pool.set_source_replica_uid( +                self.source_replica_uid) + +    def _teardown_sync_decr_pool(self): +        """ +        Tear down the SyncDecrypterPool. +        """ +        if self._sync_decr_pool is not None: +            self._sync_decr_pool.close() +            self._sync_decr_pool = None + +    def _setup_sync_watcher(self): +        """ +        Set up the sync watcher for deferred decryption. +        """ +        if self._sync_watcher is None: +            self._sync_watcher = TimerTask( +                self._decrypt_syncing_received_docs, +                delay=self.DECRYPT_TASK_PERIOD) + +    def _teardown_sync_watcher(self): +        """ +        Tear down the sync watcher. +        """ +        if self._sync_watcher is not None: +            self._sync_watcher.stop() +            self._sync_watcher.shutdown() +            self._sync_watcher = None + +    def _get_replica_uid(self, url): +        """ +        Return replica uid from the url, or None. + +        :param url: the replica url +        :type url: str +        """ +        replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url) +        return replica_uid_match[0] if len(replica_uid_match) > 0 else None + +    @staticmethod +    def connect(url, source_replica_uid=None, crypto=None): +        return SoledadSyncTarget( +            url, source_replica_uid=source_replica_uid, crypto=crypto) + +    def _parse_received_doc_response(self, response): +        """ +        Parse the response from the server containing the received document. + +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        """ +        data, _ = response +        # decode incoming stream +        parts = data.splitlines() +        if not parts or parts[0] != '[' or parts[-1] != ']': +            raise errors.BrokenSyncStream +        data = parts[1:-1] +        # decode metadata +        line, comma = utils.check_and_strip_comma(data[0]) +        metadata = None +        try: +            metadata = json.loads(line) +            new_generation = metadata['new_generation'] +            new_transaction_id = metadata['new_transaction_id'] +            number_of_changes = metadata['number_of_changes'] +        except (json.JSONDecodeError, KeyError): +            raise errors.BrokenSyncStream +        # make sure we have replica_uid from fresh new dbs +        if self._ensure_callback and 'replica_uid' in metadata: +            self._ensure_callback(metadata['replica_uid']) +        # parse incoming document info +        doc_id = None +        rev = None +        content = None +        gen = None +        trans_id = None +        if number_of_changes > 0: +            try: +                entry = json.loads(data[1]) +                doc_id = entry['id'] +                rev = entry['rev'] +                content = entry['content'] +                gen = entry['gen'] +                trans_id = entry['trans_id'] +            except (IndexError, KeyError): +                raise errors.BrokenSyncStream +        return new_generation, new_transaction_id, number_of_changes, \ +            doc_id, rev, content, gen, trans_id + +    def _insert_received_doc(self, idx, total, response): +        """ +        Insert a received document into the local replica. + +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        :param response: The body and headers of the response. +        :type response: tuple(str, dict) +        """ +        new_generation, new_transaction_id, number_of_changes, doc_id, \ +            rev, content, gen, trans_id = \ +                self._parse_received_doc_response(response) +        if doc_id is not None: +            # decrypt incoming document and insert into local database +            # ------------------------------------------------------------- +            # symmetric decryption of document's contents +            # ------------------------------------------------------------- +            # If arriving content was symmetrically encrypted, we decrypt it. +            # We do it inline if defer_decryption flag is False or no sync_db +            # was defined, otherwise we defer it writing it to the received +            # docs table. +            doc = SoledadDocument(doc_id, rev, content) +            if is_symmetrically_encrypted(doc): +                if self._queue_for_decrypt: +                    self._save_encrypted_received_doc( +                        doc, gen, trans_id, idx, total) +                else: +                    # defer_decryption is False or no-sync-db fallback +                    doc.set_json(decrypt_doc(self._crypto, doc)) +                    self._return_doc_cb(doc, gen, trans_id) +            else: +                # not symmetrically encrypted doc, insert it directly +                # or save it in the decrypted stage. +                if self._queue_for_decrypt: +                    self._save_received_doc(doc, gen, trans_id, idx, total) +                else: +                    self._return_doc_cb(doc, gen, trans_id) +            # ------------------------------------------------------------- +            # end of symmetric decryption +            # ------------------------------------------------------------- +        msg = "%d/%d" % (idx + 1, total) +        signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg) +        logger.debug("Soledad sync receive status: %s" % msg) +        return number_of_changes, new_generation, new_transaction_id + +    def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, +                         headers, return_doc_cb, ensure_callback, sync_id, +                         syncer_pool, defer_decryption=False): +        """ +        Fetch sync documents from the remote database and insert them in the +        local database. + +        If an incoming document's encryption scheme is equal to +        EncryptionSchemes.SYMKEY, then this method will decrypt it with +        Soledad's symmetric key. + +        :param url: The syncing URL. +        :type url: str +        :param last_known_generation: Target's last known generation. +        :type last_known_generation: int +        :param last_known_trans_id: Target's last known transaction id. +        :type last_known_trans_id: str +        :param headers: The headers of the HTTP request. +        :type headers: dict +        :param return_doc_cb: A callback to insert docs from target. +        :type return_doc_cb: callable +        :param ensure_callback: A callback to ensure we have the correct +                                target_replica_uid, if it was just created. +        :type ensure_callback: callable +        :param sync_id: The id for the current sync session. +        :type sync_id: str +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :raise BrokenSyncStream: If `data` is malformed. + +        :return: A dictionary representing the first line of the response got +                 from remote replica. +        :rtype: dict +        """ +        # we keep a reference to the callback in case we defer the decryption +        self._return_doc_cb = return_doc_cb +        self._queue_for_decrypt = defer_decryption \ +            and self._sync_db is not None + +        new_generation = last_known_generation +        new_transaction_id = last_known_trans_id + +        if self._queue_for_decrypt: +            logger.debug( +                "Soledad sync: will queue received docs for decrypting.") + +        idx = 0 +        number_of_changes = 1 + +        first_request = True +        last_callback_lock = None +        threads = [] + +        # get incoming documents +        while idx < number_of_changes: +            # bail out if sync process was interrupted +            if self.stopped is True: +                break + +            # launch a thread to fetch one document from target +            t = syncer_pool.new_syncer_thread( +                idx, number_of_changes, +                last_callback_lock=last_callback_lock) + +            # bail out if any thread failed +            if t is None: +                self.stop() +                break + +            t.doc_syncer.set_request_method( +                'get', idx, sync_id, last_known_generation, +                last_known_trans_id) +            t.doc_syncer.set_success_callback(self._insert_received_doc) + +            def _failure_callback(idx, total, exception): +                _failure_msg = "Soledad sync: error while getting document " \ +                    "%d/%d: %s" \ +                    % (idx + 1, total, exception) +                logger.warning("%s" % _failure_msg) +                logger.warning("Soledad sync: failing gracefully, will " +                               "recover on next sync.") + +            t.doc_syncer.set_failure_callback(_failure_callback) +            threads.append(t) +            t.start() +            last_callback_lock = t.callback_lock +            idx += 1 + +            # if this is the first request, wait to update the number of +            # changes +            if first_request is True: +                t.join() +                if t.success: +                    number_of_changes, _, _ = t.result +                first_request = False + +        # make sure all threads finished and we have up-to-date info +        last_successful_thread = None +        while threads: +            # check if there are failures +            t = threads.pop(0) +            t.join() +            if t.success: +                last_successful_thread = t + +        # get information about last successful thread +        if last_successful_thread is not None: +            body, _ = last_successful_thread.response +            parsed_body = json.loads(body) +            # get current target gen and trans id in case no documents were +            # transferred +            if len(parsed_body) == 1: +                metadata = parsed_body[0] +                new_generation = metadata['new_generation'] +                new_transaction_id = metadata['new_transaction_id'] +            # get current target gen and trans id from last transferred +            # document +            else: +                doc_data = parsed_body[1] +                new_generation = doc_data['gen'] +                new_transaction_id = doc_data['trans_id'] + +        return new_generation, new_transaction_id + +    def sync_exchange(self, docs_by_generations, +                      source_replica_uid, last_known_generation, +                      last_known_trans_id, return_doc_cb, +                      ensure_callback=None, defer_decryption=True, +                      sync_id=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. @@ -586,24 +1094,54 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):                                      the last local generation the remote                                      replica knows about.          :type docs_by_generations: list of tuples +          :param source_replica_uid: The uid of the source replica.          :type source_replica_uid: str +          :param last_known_generation: Target's last known generation.          :type last_known_generation: int +          :param last_known_trans_id: Target's last known transaction id.          :type last_known_trans_id: str +          :param return_doc_cb: A callback for inserting received documents from -            target. +                              target. If not overriden, this will call u1db +                              insert_doc_from_target in synchronizer, which +                              implements the TAKE OTHER semantics.          :type return_doc_cb: function +          :param ensure_callback: A callback that ensures we know the target -            replica uid if the target replica was just created. +                                replica uid if the target replica was just +                                created.          :type ensure_callback: function +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool +          :return: The new generation and transaction id of the target replica.          :rtype: tuple          """ +        self._ensure_callback = ensure_callback + +        if defer_decryption: +            self._sync_exchange_lock.acquire() +            self._setup_sync_decr_pool() +            self._setup_sync_watcher() +            self._defer_decryption = True +          self.start() -        sync_id = str(uuid4()) + +        if sync_id is None: +            sync_id = str(uuid4()) +        self.source_replica_uid = source_replica_uid +        # let the decrypter pool access the passed callback to insert docs +        setProxiedObject(self._insert_doc_cb[source_replica_uid], +                         return_doc_cb) + +        if not self.clear_to_sync(): +            raise PendingReceivedDocsSyncError          self._ensure_connection()          if self._trace_hook:  # for tests @@ -611,78 +1149,135 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)          headers = self._sign_request('POST', url, {}) -        def _post_put_doc(headers, last_known_generation, last_known_trans_id, -                          id, rev, content, gen, trans_id, sync_id): -            """ -            Put a sync document on server by means of a POST request. - -            :param received: How many documents have already been received in -                             this sync session. -            :type received: int -            """ -            # prepare to send the document -            entries = ['['] -            size = 1 -            # add remote replica metadata to the request -            size += self._prepare( -                '', entries, -                last_known_generation=last_known_generation, -                last_known_trans_id=last_known_trans_id, -                sync_id=sync_id, -                ensure=ensure_callback is not None) -            # add the document to the request -            size += self._prepare( -                ',', entries, -                id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) -            entries.append('\r\n]') -            size += len(entries[-1]) -            # send headers -            self._init_post_request(url, 'put', headers, size) -            # send document -            for entry in entries: -                self._conn.send(entry) -            data, _ = self._response() -            data = json.loads(data) -            return data[0]['new_generation'], data[0]['new_transaction_id'] -          cur_target_gen = last_known_generation          cur_target_trans_id = last_known_trans_id          # send docs +        msg = "%d/%d" % (0, len(docs_by_generations)) +        signal(SOLEDAD_SYNC_SEND_STATUS, msg) +        logger.debug("Soledad sync send status: %s" % msg) + +        defer_encryption = self._sync_db is not None +        syncer_pool = DocumentSyncerPool( +            self._raw_url, self._raw_creds, url, headers, ensure_callback, +            self.stop) +        threads = [] +        last_request_lock = None +        last_callback_lock = None          sent = 0 -        signal( -            SOLEDAD_SYNC_SEND_STATUS, -            "%d/%d" % (0, len(docs_by_generations))) +        total = len(docs_by_generations) + +        synced = [] +        number_of_docs = len(docs_by_generations) +          for doc, gen, trans_id in docs_by_generations:              # allow for interrupting the sync process              if self.stopped is True:                  break +              # skip non-syncable docs              if isinstance(doc, SoledadDocument) and not doc.syncable:                  continue +              # -------------------------------------------------------------              # symmetric encryption of document's contents              # -------------------------------------------------------------              doc_json = doc.get_json()              if not doc.is_tombstone(): -                doc_json = encrypt_doc(self._crypto, doc) +                if not defer_encryption: +                    # fallback case, for tests +                    doc_json = encrypt_doc(self._crypto, doc) +                else: +                    try: +                        doc_json = self.get_encrypted_doc_from_db( +                            doc.doc_id, doc.rev) +                    except Exception as exc: +                        logger.error("Error while getting " +                                     "encrypted doc from db") +                        logger.exception(exc) +                        continue +                    if doc_json is None: +                        # Not marked as tombstone, but we got nothing +                        # from the sync db. As it is not encrypted yet, we +                        # force inline encryption. +                        # TODO: implement a queue to deal with these cases. +                        doc_json = encrypt_doc(self._crypto, doc)              # -------------------------------------------------------------              # end of symmetric encryption              # ------------------------------------------------------------- -            cur_target_gen, cur_target_trans_id = _post_put_doc( -                headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, -                rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, -                sync_id=sync_id) +            t = syncer_pool.new_syncer_thread( +                sent + 1, total, last_request_lock=None, +                last_callback_lock=last_callback_lock) + +            # bail out if any thread failed +            if t is None: +                self.stop() +                break + +            # set the request method +            t.doc_syncer.set_request_method( +                'put', sync_id, cur_target_gen, cur_target_trans_id, +                id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, +                trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=sent + 1) +            # set the success calback + +            def _success_callback(idx, total, response): +                _success_msg = "Soledad sync send status: %d/%d" \ +                               % (idx, total) +                signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) +                logger.debug(_success_msg) + +            t.doc_syncer.set_success_callback(_success_callback) + +            # set the failure callback +            def _failure_callback(idx, total, exception): +                _failure_msg = "Soledad sync: error while sending document " \ +                               "%d/%d: %s" % (idx, total, exception) +                logger.warning("%s" % _failure_msg) +                logger.warning("Soledad sync: failing gracefully, will " +                               "recover on next sync.") + +            t.doc_syncer.set_failure_callback(_failure_callback) + +            # save thread and append +            t.start() +            threads.append((t, doc)) +            last_request_lock = t.request_lock +            last_callback_lock = t.callback_lock              sent += 1 -            signal( -                SOLEDAD_SYNC_SEND_STATUS, -                "%d/%d" % (sent, len(docs_by_generations))) + +        # make sure all threads finished and we have up-to-date info +        while threads: +            # check if there are failures +            t, doc = threads.pop(0) +            t.join() +            if t.success: +                synced.append((doc.doc_id, doc.rev)) + +        if defer_decryption: +            self._sync_watcher.start()          # get docs from target -        cur_target_gen, cur_target_trans_id = self._get_remote_docs( -            url, -            last_known_generation, last_known_trans_id, headers, -            return_doc_cb, ensure_callback, sync_id) +        if self.stopped is False: +            cur_target_gen, cur_target_trans_id = self._get_remote_docs( +                url, +                last_known_generation, last_known_trans_id, headers, +                return_doc_cb, ensure_callback, sync_id, syncer_pool, +                defer_decryption=defer_decryption) +        syncer_pool.cleanup() + +        # delete documents from the sync database +        if defer_encryption: +            self.delete_encrypted_docs_from_db(synced) + +        # wait for deferred decryption to finish +        if defer_decryption: +            while self.clear_to_sync() is False: +                sleep(self.DECRYPT_TASK_PERIOD) +            self._teardown_sync_watcher() +            self._teardown_sync_decr_pool() +            self._sync_exchange_lock.release() +          self.stop()          return cur_target_gen, cur_target_trans_id @@ -714,3 +1309,164 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          with self._stop_lock:              return self._stopped is True + +    def get_encrypted_doc_from_db(self, doc_id, doc_rev): +        """ +        Retrieve encrypted document from the database of encrypted docs for +        sync. + +        :param doc_id: The Document id. +        :type doc_id: str + +        :param doc_rev: The document revision +        :type doc_rev: str +        """ +        encr = SyncEncrypterPool +        c = self._sync_db.cursor() +        sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( +            encr.TABLE_NAME,)) +        c.execute(sql, (doc_id, doc_rev)) +        res = c.fetchall() +        if len(res) != 0: +            return res[0][0] + +    def delete_encrypted_docs_from_db(self, docs_ids): +        """ +        Delete several encrypted documents from the database of symmetrically +        encrypted docs to sync. + +        :param docs_ids: an iterable with (doc_id, doc_rev) for all documents +                         to be deleted. +        :type docs_ids: any iterable of tuples of str +        """ +        if docs_ids: +            encr = SyncEncrypterPool +            c = self._sync_db.cursor() +            for doc_id, doc_rev in docs_ids: +                sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( +                    encr.TABLE_NAME,)) +                c.execute(sql, (doc_id, doc_rev)) +            self._sync_db.commit() + +    def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): +        """ +        Save a symmetrically encrypted incoming document into the received +        docs table in the sync db. A decryption task will pick it up +        from here in turn. + +        :param doc: The document to save. +        :type doc: SoledadDocument +        :param gen: The generation. +        :type gen: str +        :param  trans_id: Transacion id. +        :type gen: str +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        """ +        logger.debug( +            "Enqueueing doc for decryption: %d/%d." +            % (idx + 1, total)) +        self._sync_decr_pool.insert_encrypted_received_doc( +            doc.doc_id, doc.rev, doc.content, gen, trans_id) + +    def _save_received_doc(self, doc, gen, trans_id, idx, total): +        """ +        Save any incoming document into the received docs table in the sync db. + +        :param doc: The document to save. +        :type doc: SoledadDocument +        :param gen: The generation. +        :type gen: str +        :param  trans_id: Transacion id. +        :type gen: str +        :param idx: The index count of the current operation. +        :type idx: int +        :param total: The total number of operations. +        :type total: int +        """ +        logger.debug( +            "Enqueueing doc, no decryption needed: %d/%d." +            % (idx + 1, total)) +        self._sync_decr_pool.insert_received_doc( +            doc.doc_id, doc.rev, doc.content, gen, trans_id) + +    # +    # Symmetric decryption of syncing docs +    # + +    def clear_to_sync(self): +        """ +        Return True if sync can proceed (ie, the received db table is empty). +        :rtype: bool +        """ +        if self._sync_decr_pool is not None: +            return self._sync_decr_pool.count_received_encrypted_docs() == 0 +        else: +            return True + +    def set_decryption_callback(self, cb): +        """ +        Set callback to be called when the decryption finishes. + +        :param cb: The callback to be set. +        :type cb: callable +        """ +        self._decryption_callback = cb + +    def has_decryption_callback(self): +        """ +        Return True if there is a decryption callback set. +        :rtype: bool +        """ +        return self._decryption_callback is not None + +    def has_syncdb(self): +        """ +        Return True if we have an initialized syncdb. +        """ +        return self._sync_db is not None + +    def _decrypt_syncing_received_docs(self): +        """ +        Decrypt the documents received from remote replica and insert them +        into the local one. + +        Called periodically from TimerTask self._sync_watcher. +        """ +        if sameProxiedObjects( +                self._insert_doc_cb.get(self.source_replica_uid), +                None): +            return + +        decrypter = self._sync_decr_pool +        decrypter.decrypt_received_docs() +        done = decrypter.process_decrypted() + +    def _sign_request(self, method, url_query, params): +        """ +        Return an authorization header to be included in the HTTP request. + +        :param method: The HTTP method. +        :type method: str +        :param url_query: The URL query string. +        :type url_query: str +        :param params: A list with encoded query parameters. +        :type param: list + +        :return: The Authorization header. +        :rtype: list of tuple +        """ +        return TokenBasedAuth._sign_request(self, method, url_query, params) + +    def set_token_credentials(self, uuid, token): +        """ +        Store given credentials so we can sign the request later. + +        :param uuid: The user's uuid. +        :type uuid: str +        :param token: The authentication token. +        :type token: str +        """ +        TokenBasedAuth.set_token_credentials(self, uuid, token) diff --git a/client/src/taskthread/__init__.py b/client/src/taskthread/__init__.py new file mode 100755 index 00000000..a734a829 --- /dev/null +++ b/client/src/taskthread/__init__.py @@ -0,0 +1,296 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +#      http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import threading + +__version__ = '1.4' + + +logger = logging.getLogger(__name__) + + +class TaskInProcessException(BaseException): +    pass + + +class TaskThread(threading.Thread): +    """ +    A thread object that repeats a task. + +    Usage example:: + +        from taskthread import TaskThread + +        import time + +        def my_task(*args, **kwargs): +            print args, kwargs + +        task_thread = TaskThread(my_task) +        task_thread.start() +        for i in xrange(10): +            task_thread.run_task() +            task_thread.join_task() +        task_thread.join() + +    .. note:: If :py:meth:`~TaskThread.run_task` is +        invoked while run_task is in progress, +        :py:class:`~.TaskInProcessException` will +        be raised. + +    :param task: +        A ``function``. This param is the task to execute when +         run_task is called. +    :param event: +        A ``threading.Event``. This event will be set when run_task +         is called. The default value is a new event, but may be +         specified for testing purposes. +    """ + +    daemon = True +    ''' +    Threads marked as daemon will be terminated. +    ''' +    def __init__(self, task, event=threading.Event(), +                 *args, **kwargs): +        super(TaskThread, self).__init__() +        self.task = task +        self.task_event = event +        self.running = True +        self.running_lock = threading.Lock() +        self.in_task = False +        self.task_complete = threading.Event() +        self.args = args +        self.kwargs = kwargs + +    def run(self): +        """ +        Called by threading.Thread, this runs in the new thread. +        """ +        while True: +            self.task_event.wait() +            if not self.running: +                logger.debug("TaskThread exiting") +                return +            logger.debug("TaskThread starting task") +            with self.running_lock: +                self.task_event.clear() +            self.task_complete.clear() +            self.task(*self.args, **self.kwargs) +            with self.running_lock: +                self.in_task = False +            self.task_complete.set() + +    def run_task(self, *args, **kwargs): +        """ +        Run an instance of the task. + +        :param args: +            The arguments to pass to the task. + +        :param kwargs: +            The keyword arguments to pass to the task. +        """ +        # Don't allow this call if the thread is currently +        # in a task. +        with self.running_lock: +            if self.in_task: +                raise TaskInProcessException() +            self.in_task = True +        logger.debug("Waking up the thread") +        self.args = args +        self.kwargs = kwargs +        # Wake up the thread to do it's thing +        self.task_event.set() + +    def join_task(self, time_out): +        """ +        Wait for the currently running task to complete. + +        :param time_out: +            An ``int``. The amount of time to wait for the +            task to finish. +        """ +        with self.running_lock: +            if not self.in_task: +                return + +        success = self.task_complete.wait(time_out) +        if success: +            self.task_complete.clear() +        return success + +    def join(self, timeout=None): +        """ +        Wait for the task to finish +        """ +        self.running = False +        self.task_event.set() +        super(TaskThread, self).join(timeout=timeout) + + +class TimerTask(object): +    """ +    An object that executes a commit function at a given interval. +    This class is driven by a TaskThread. A new TaskThread will be +    created the first time :py:meth:`.~start` is called. All +    subsequent calls to start will reuse the same thread. + +    Usage example:: + +        from taskthread import TimerTask +        import time + +        count = 0 +        def get_count(): +            return count +        def execute(): +            print "Count: %d" % count + +        task = TimerTask(execute, +                         timeout=10, +                         count_fcn=get_count, +                         threshold=1) + +        task.start() + +        for i in xrange(100000): +            count += 1 +            time.sleep(1) +        task.stop() +        count = 0 +        task.start() +        for i in xrange(100000): +            count += 1 +            time.sleep(1) +        task.shutdown() + +    :param execute_fcn: +        A `function`. This function will be executed on each time interval. + +    :param delay: +        An `int`. The delay in **seconds** invocations of +        `execute_fcn`. Default: `10`. + +    :param count_fcn: +        A `function`. This function returns a current count. If the count +        has not changed more the `threshold` since the last invocation of +        `execute_fcn`, `execute_fcn` will not be called again. If not +        specified, `execute_fcn` will be called each time the timer fires. +        **Optional**. If count_fcn is specified, ``threshold`` is required. + +    :param threshold: +        An `int`. Specifies the minimum delta in `count_fcn` that must be +        met for `execute_fcn` to be invoked. **Optional**. Must be +        specified in conjunction with `count_fcn`. + +    """ +    def __init__(self, execute_fcn, delay=10, count_fcn=None, threshold=None): +        self.running = True +        self.execute_fcn = execute_fcn +        self.last_count = 0 +        self.event = threading.Event() +        self.delay = delay +        self.thread = None +        self.running_lock = threading.RLock() +        if bool(threshold) != bool(count_fcn): +            raise ValueError("Must specify threshold " +                             "and count_fcn, or neither") + +        self.count_fcn = count_fcn +        self.threshold = threshold + +    def start(self): +        """ +        Start the task. This starts a :py:class:`.~TaskThread`, and starts +        running run_threshold_timer on the thread. + +        """ +        if not self.thread: +            logger.debug('Starting up the taskthread') +            self.thread = TaskThread(self._run_threshold_timer) +            self.thread.start() + +        if self.threshold: +            self.last_count = 0 + +        logger.debug('Running the task') +        self.running = True +        self.thread.run_task() + +    def stop(self): +        """ +        Stop the task, leaving the :py:class:`.~TaskThread` running +        so start can be called again. + +        """ +        logger.debug('Stopping the task') +        wait = False +        with self.running_lock: +            if self.running: +                wait = True +                self.running = False +        if wait: +            self.event.set() +            self.thread.join_task(2) + +    def shutdown(self): +        """ +        Close down the task thread and stop the task if it is running. + +        """ +        logger.debug('Shutting down the task') +        self.stop() +        self.thread.join(2) + +    def _exec_if_threshold_met(self): +        new_count = self.count_fcn() +        logger.debug('new_count: %d' % new_count) +        if new_count >= self.last_count + self.threshold: +            self.execute_fcn() +            self.last_count = new_count + +    def _exec(self): +        if self.count_fcn: +            self._exec_if_threshold_met() +        else: +            self.execute_fcn() + +    def _wait(self): +        self.event.wait(timeout=self.delay) +        self.event.clear() +        logger.debug('Task woken up') + +    def _exit_loop(self): +        """ +        If self.running is false, it means the task should shut down. +        """ +        exit_loop = False +        with self.running_lock: +            if not self.running: +                exit_loop = True +                logger.debug('Task shutting down') +        return exit_loop + +    def _run_threshold_timer(self): +        """ +        Main loop of the timer task + +        """ +        logger.debug('In Task') +        while True: +            self._wait() +            if self._exit_loop(): +                return +            self._exec() diff --git a/client/src/taskthread/tests/__init__.py b/client/src/taskthread/tests/__init__.py new file mode 100755 index 00000000..92bd912f --- /dev/null +++ b/client/src/taskthread/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +#      http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/client/src/taskthread/tests/unit/__init__.py b/client/src/taskthread/tests/unit/__init__.py new file mode 100755 index 00000000..92bd912f --- /dev/null +++ b/client/src/taskthread/tests/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +#      http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/client/src/taskthread/tests/unit/test_taskthread.py b/client/src/taskthread/tests/unit/test_taskthread.py new file mode 100755 index 00000000..82565922 --- /dev/null +++ b/client/src/taskthread/tests/unit/test_taskthread.py @@ -0,0 +1,315 @@ +# -*- coding: utf-8 -*- +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +#      http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License.:w + + +import threading +import unittest2 as unittest + +from mock import Mock, patch + +from taskthread import TaskThread, TaskInProcessException, TimerTask + +forever_event = threading.Event() + + +def forever_function(*args, **kwargs): +    forever_event.wait() +    forever_event.clear() + + +class TaskThreadTestCase(unittest.TestCase): +    """ +    Tests for :py:class:`.TaskThread`. +    """ + +    def test___init__(self): +        """ +        Test the __init__ method. It doesn't really do much. +        """ +        task_thread = TaskThread(forever_function) +        self.assertEqual(forever_function, task_thread.task) + +    def test_run_not_running(self): +        """ +        Verifies that thread will shut down when running is false +        """ +        event = Mock() +        event.wait = Mock(side_effect=[True]) +        event.clear = Mock(side_effect=Exception("Should never be called")) +        task_thread = TaskThread(forever_function, +                                 event=event) +        task_thread.running = False +        task_thread.run() +        event.wait.assert_called_once_with() + +    def test_run_executes_task(self): +        event = Mock() +        event.wait = Mock(side_effect=[True, True]) + +        def stop_iteration(*args, **kwargs): +            args[0].running = False + +        task_thread = TaskThread(stop_iteration, +                                 event=event) + +        task_thread.args = [task_thread] +        task_thread.kwargs = {'a': 2} +        task_thread.in_task = True +        task_thread.run() +        self.assertEqual(False, task_thread.in_task) + +    def test_run_task(self): +        event = Mock() +        task_thread = TaskThread(forever_function, +                                 event=event) +        args = [1] +        kwargs = {'a': 1} + +        task_thread.run_task(*args, **kwargs) +        self.assertEqual(tuple(args), task_thread.args) +        self.assertEqual(kwargs, task_thread.kwargs) +        event.set.assert_called_once_with() + +    def test_run_task_task_in_progress(self): +        event = Mock() +        task_thread = TaskThread(forever_function, +                                 event=event) +        task_thread.in_task = True +        self.assertRaises(TaskInProcessException, task_thread.run_task) + +    def test_join_task(self): +        task_thread = TaskThread(forever_function) +        task_thread.in_task = True +        task_thread.task_complete = Mock() +        task_thread.task_complete.wait = Mock(side_effect=[True]) +        success = task_thread.join_task(1) +        self.assertTrue(success) + +    def test_join_task_not_running(self): +        task_thread = TaskThread(forever_function) +        task_thread.task_complete = Mock() +        task_thread.wait =\ +            Mock(side_effect=Exception("Should never be called")) +        task_thread.join_task(1) + +    def test_join(self): +        task_thread = TaskThread(forever_function) +        task_thread.start() +        task_thread.run_task() +        # Set the event so the task completes +        forever_event.set() +        task_thread.join_task(1) +        task_thread.join(1) + +    def test_execute_multiple_tasks(self): +        task_thread = TaskThread(forever_function) +        task_thread.start() +        task_thread.run_task() +        # Set the event so the task completes +        forever_event.set() +        task_thread.join_task(1) +        forever_event.set() +        task_thread.join_task(1) +        task_thread.join(1) + + +def my_func(): +    pass + + +class TimerTaskTestCase(unittest.TestCase): + +    def test___int__(self): + +        task = TimerTask(my_func, +                         delay=100) +        self.assertEqual(my_func, task.execute_fcn) +        self.assertEqual(100, task.delay) +        self.assertIsNone(task.count_fcn) +        self.assertIsNone(task.threshold) + +    def test___int__raises(self): +        self.assertRaises(ValueError, TimerTask.__init__, +                          TimerTask(None), +                          my_func(), +                          count_fcn=Mock()) + +        self.assertRaises(ValueError, TimerTask.__init__, +                          TimerTask(None), +                          my_func(), +                          threshold=Mock()) + +    @patch('taskthread.TaskThread') +    def test_start(self, TaskThreadMock): +        task = TimerTask(my_func) +        thread = TaskThreadMock.return_value + +        task.start() +        self.assertTrue(task.running) +        self.assertEqual(thread, task.thread) +        thread.start.assert_called_once_with() +        thread.run_task.assert_called_once_with() + +    @patch('taskthread.TaskThread') +    def test_start_restarts(self, TaskThreadMock): +        task = TimerTask(my_func, threshold=1, count_fcn=Mock()) +        thread = TaskThreadMock.return_value +        task.last_count = 1 +        task.thread = thread + +        task.start() +        self.assertEqual(0, task.last_count) +        self.assertEqual(0, thread.start.called) +        thread.run_task.assert_called_once_with() + +    @patch('taskthread.TaskThread') +    def test_stop(self, TaskThreadMock): +        running_lock = Mock() +        running_lock.__enter__ = Mock() +        running_lock.__exit__ = Mock() +        task = TimerTask(my_func) +        task.thread = TaskThreadMock.return_value +        task.running = True +        task.event = Mock() +        task.running_lock = running_lock + +        task.stop() + +        self.assertEqual(False, task.running) +        self.assertEqual(1, task.event.set.called) +        running_lock.__enter__.assert_called_once_with() +        running_lock.__exit__.assert_called_once_with(None, None, None) +        task.thread.join_task.assert_called_once_with(2) + +    @patch('taskthread.TaskThread') +    def test_stop_not_running(self, TaskThreadMock): +        task = TimerTask(my_func) +        task.thread = TaskThreadMock.return_value +        task.running = False +        task.event = Mock() + +        task.stop() + +        self.assertEqual(False, task.running) +        self.assertEqual(0, task.event.set.called) +        self.assertEqual(0, task.thread.join_task.called) + +    @patch('taskthread.TaskThread') +    def test_shutdown(self, TaskThreadMock): +        task = TimerTask(my_func) +        task.thread = TaskThreadMock.return_value +        task.running = False +        task.shutdown() +        task.thread.join.assert_called_once_with(2) + +    def test__exec_if_threshold_met(self): +        self.called = False + +        def exec_fcn(): +            self.called = True + +        def count_fcn(): +            return 10 + +        task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1) +        task.last_count = 9 +        task._exec_if_threshold_met() +        self.assertTrue(self.called) +        self.assertEqual(10, task.last_count) + +    def test__exec_if_threshold_met_not_met(self): + +        def exec_fcn(): +            raise Exception("This shouldn't happen!!") + +        def count_fcn(): +            return 10 + +        task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=10) +        task.last_count = 9 +        task._exec_if_threshold_met() +        self.assertEqual(9, task.last_count) + +    def test__exec(self): +        self.called = False + +        def exec_fcn(): +            self.called = True + +        task = TimerTask(exec_fcn) +        task._exec() +        self.assertTrue(self.called) + +    def test__exec_threshold(self): +        self.called = False + +        def exec_fcn(): +            self.called = True + +        def count_fcn(): +            return 1 + +        task = TimerTask(exec_fcn, count_fcn=count_fcn, threshold=1) +        task._exec() +        self.assertTrue(self.called) + +    @patch('threading.Event') +    def test__wait(self, event_mock): +        task = TimerTask(my_func) +        event = event_mock.return_value + +        task._wait() +        event.wait.assert_called_once_with(timeout=task.delay) +        self.assertEqual(1, event.clear.called) + +    @patch('threading.RLock') +    def test__exit_loop(self, mock_rlock): +        task = TimerTask(my_func) +        task.running = False +        lock = mock_rlock.return_value +        lock.__enter__ = Mock() +        lock.__exit__ = Mock() +        self.assertTrue(task._exit_loop()) +        self.assertEqual(1, lock.__enter__.called) +        lock.__exit__.assert_called_once_with(None, None, None) + +    @patch('threading.RLock') +    def test__exit_loop_running(self, mock_rlock): +        lock = mock_rlock.return_value +        lock.__enter__ = Mock() +        lock.__exit__ = Mock() +        task = TimerTask(my_func) +        task.running = True +        self.assertFalse(task._exit_loop()) +        self.assertEqual(1, lock.__enter__.called) +        lock.__exit__.assert_called_once_with(None, None, None) + +    @patch('threading.RLock') +    @patch('threading.Event') +    def test__run_threshold_timer(self, event_mock, rlock_mock): +        self.task = None +        event = event_mock.return_value +        lock = rlock_mock.return_value +        lock.__enter__ = Mock() +        lock.__exit__ = Mock() + +        def exec_fcn(): +            self.task.running = False + +        self.task = TimerTask(exec_fcn) +        self.task._run_threshold_timer() + +        self.assertFalse(self.task.running) +        self.assertEqual(2, event.wait.call_count) diff --git a/common/src/leap/soledad/common/_version.py b/common/src/leap/soledad/common/_version.py index a3227cde..cf4e6706 100644 --- a/common/src/leap/soledad/common/_version.py +++ b/common/src/leap/soledad/common/_version.py @@ -5,8 +5,8 @@  # unpacked source archive. Distribution tarballs contain a pre-generated copy  # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c'  def get_versions(default={}, verbose=False): diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index b51b32f3..5658f4ce 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -1106,7 +1106,9 @@ class CouchDatabase(CommonBackend):          )      def _set_replica_gen_and_trans_id(self, other_replica_uid, -                                      other_generation, other_transaction_id): +                                      other_generation, other_transaction_id, +                                      number_of_docs=None, doc_idx=None, +                                      sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1122,12 +1124,21 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the              generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          self._do_set_replica_gen_and_trans_id( -            other_replica_uid, other_generation, other_transaction_id) +            other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)      def _do_set_replica_gen_and_trans_id( -            self, other_replica_uid, other_generation, other_transaction_id): +            self, other_replica_uid, other_generation, other_transaction_id, +            number_of_docs=None, doc_idx=None, sync_id=None):          """          Set the last-known generation and transaction id for the other          database replica. @@ -1143,6 +1154,13 @@ class CouchDatabase(CommonBackend):          :param other_transaction_id: The transaction id associated with the                                       generation.          :type other_transaction_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :raise MissingDesignDocError: Raised when tried to access a missing                                        design document. @@ -1163,12 +1181,19 @@ class CouchDatabase(CommonBackend):          res = self._database.resource(*ddoc_path)          try:              with CouchDatabase.update_handler_lock[self._get_replica_uid()]: +                body={ +                    'other_replica_uid': other_replica_uid, +                    'other_generation': other_generation, +                    'other_transaction_id': other_transaction_id, +                } +                if number_of_docs is not None: +                    body['number_of_docs'] = number_of_docs +                if doc_idx is not None: +                    body['doc_idx'] = doc_idx +                if sync_id is not None: +                    body['sync_id'] = sync_id                  res.put_json( -                    body={ -                        'other_replica_uid': other_replica_uid, -                        'other_generation': other_generation, -                        'other_transaction_id': other_transaction_id, -                    }, +                    body=body,                      headers={'content-type': 'application/json'})          except ResourceNotFound as e:              raise_missing_design_doc_error(e, ddoc_path) @@ -1306,7 +1331,8 @@ class CouchDatabase(CommonBackend):              doc.set_conflicts(cur_doc.get_conflicts())      def _put_doc_if_newer(self, doc, save_conflict, replica_uid, replica_gen, -                          replica_trans_id=''): +                          replica_trans_id='', number_of_docs=None, +                          doc_idx=None, sync_id=None):          """          Insert/update document into the database with a given revision. @@ -1339,6 +1365,13 @@ class CouchDatabase(CommonBackend):          :param replica_trans_id: The transaction_id associated with the                                   generation.          :type replica_trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document being sent. +        :type doc_idx: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :return: (state, at_gen) -  If we don't have doc_id already, or if                   doc_rev supersedes the existing document revision, then the @@ -1398,7 +1431,9 @@ class CouchDatabase(CommonBackend):                  self._force_doc_sync_conflict(doc)          if replica_uid is not None and replica_gen is not None:              self._set_replica_gen_and_trans_id( -                replica_uid, replica_gen, replica_trans_id) +                replica_uid, replica_gen, replica_trans_id, +                number_of_docs=number_of_docs, doc_idx=doc_idx, +                sync_id=sync_id)          # update info          old_doc.rev = doc.rev          if doc.is_tombstone(): diff --git a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js index 722f695a..b0ae2de6 100644 --- a/common/src/leap/soledad/common/ddocs/syncs/updates/put.js +++ b/common/src/leap/soledad/common/ddocs/syncs/updates/put.js @@ -1,22 +1,151 @@ +/** + * The u1db_sync_log document stores both the actual sync log and a list of + * pending updates to the log, in case we receive incoming documents out of + * the correct order (i.e. if there are parallel PUTs during the sync + * process). + * + * The structure of the document is the following: + * + *     { + *         'syncs': [ + *             ['<replica_uid>', <gen>, '<trans_id>'], + *             ...  + *         ], + *         'pending': { + *             'other_replica_uid': { + *                 'sync_id': '<sync_id>', + *                 'log': [[<gen>, '<trans_id>'], ...] + *             }, + *             ... + *         } + *     } + * + * The update function below does the following: + * + *   0. If we do not receive a sync_id, we just update the 'syncs' list with + *      the incoming info about the source replica state. + * + *   1. Otherwise, if the incoming sync_id differs from current stored + *      sync_id, then we assume that the previous sync session for that source + *      replica was interrupted and discard all pending data. + * + *   2. Then we append incoming info as pending data for that source replica + *      and current sync_id, and sort the pending data by generation. + * + *   3. Then we go through pending data and find the most recent generation + *      that we can use to update the actual sync log. + * + *   4. Finally, we insert the most up to date information into the sync log. + */  function(doc, req){ + +    // create the document if it doesn't exist      if (!doc) {          doc = {}          doc['_id'] = 'u1db_sync_log';          doc['syncs'] = [];      } -    body = JSON.parse(req.body); + +    // get and validate incoming info +    var body = JSON.parse(req.body); +    var other_replica_uid = body['other_replica_uid']; +    var other_generation = parseInt(body['other_generation']); +    var other_transaction_id = body['other_transaction_id'] +    var sync_id = body['sync_id']; +    var number_of_docs = body['number_of_docs']; +    var doc_idx = body['doc_idx']; + +    // parse integers +    if (number_of_docs != null) +        number_of_docs = parseInt(number_of_docs); +    if (doc_idx != null) +        doc_idx = parseInt(doc_idx); + +    if (other_replica_uid == null +            || other_generation == null +            || other_transaction_id == null) +        return [null, 'invalid data']; + +    // create slot for pending logs +    if (doc['pending'] == null) +        doc['pending'] = {}; + +    // these are the values that will be actually inserted +    var current_gen = other_generation; +    var current_trans_id = other_transaction_id; + +    /*------------- Wait for sequential values before storing -------------*/ + +    // we just try to obtain pending log if we received a sync_id +    if (sync_id != null) { + +        // create slot for current source and sync_id pending log +        if (doc['pending'][other_replica_uid] == null +                || doc['pending'][other_replica_uid]['sync_id'] != sync_id) { +            doc['pending'][other_replica_uid] = { +                'sync_id': sync_id, +                'log': [], +                'last_doc_idx': 0, +            } +        } + +        // append incoming data to pending log +        doc['pending'][other_replica_uid]['log'].push([ +            other_generation, +            other_transaction_id, +            doc_idx, +        ]) + +        // sort pending log according to generation +        doc['pending'][other_replica_uid]['log'].sort(function(a, b) { +            return a[0] - b[0]; +        }); + +        // get most up-to-date information from pending log +        var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx']; +        var pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; + +        current_gen = null; +        current_trans_id = null; + +        while (last_doc_idx + 1 == pending_idx) { +            pending = doc['pending'][other_replica_uid]['log'].shift() +            current_gen = pending[0]; +            current_trans_id = pending[1]; +            last_doc_idx = pending[2] +            if (doc['pending'][other_replica_uid]['log'].length == 0) +                break; +            pending_idx = doc['pending'][other_replica_uid]['log'][0][2]; +        } + +        // leave the sync log untouched if we still did not receive enough docs +        if (current_gen == null) +            return [doc, 'ok']; + +        // update last index of received doc +        doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx; + +        // eventually remove all pending data from that replica +        if (last_doc_idx == number_of_docs) +            delete doc['pending'][other_replica_uid] +    } + +    /*--------------- Store source replica info on sync log ---------------*/ +      // remove outdated info      doc['syncs'] = doc['syncs'].filter(          function (entry) { -            return entry[0] != body['other_replica_uid']; +            return entry[0] != other_replica_uid;          }      ); -    // store u1db rev + +    // store in log      doc['syncs'].push([ -        body['other_replica_uid'], -        body['other_generation'], -        body['other_transaction_id'] +        other_replica_uid, +        current_gen, +        current_trans_id       ]); +      return [doc, 'ok'];  } diff --git a/common/src/leap/soledad/common/tests/__init__.py b/common/src/leap/soledad/common/tests/__init__.py index a38bdaed..3081683b 100644 --- a/common/src/leap/soledad/common/tests/__init__.py +++ b/common/src/leap/soledad/common/tests/__init__.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # __init__.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -19,7 +19,6 @@  """  Tests to make sure Soledad provides U1DB functionality and more.  """ -  import os  import random  import string @@ -29,11 +28,8 @@ from mock import Mock  from leap.soledad.common.document import SoledadDocument  from leap.soledad.client import Soledad -from leap.soledad.client.crypto import SoledadCrypto -from leap.soledad.client.target import ( -    decrypt_doc, -    ENC_SCHEME_KEY, -) +from leap.soledad.client.crypto import decrypt_doc_dict +from leap.soledad.client.crypto import ENC_SCHEME_KEY  from leap.common.testing.basetest import BaseLeapTest @@ -49,6 +45,7 @@ class BaseSoledadTest(BaseLeapTest):      """      Instantiates Soledad for usage in tests.      """ +    defer_sync_encryption = False      def setUp(self):          # config info @@ -73,11 +70,26 @@ class BaseSoledadTest(BaseLeapTest):          self._db1.close()          self._db2.close()          self._soledad.close() +          # XXX should not access "private" attrs          for f in [self._soledad._local_db_path, self._soledad._secrets_path]:              if os.path.isfile(f):                  os.unlink(f) +    def get_default_shared_mock(self, put_doc_side_effect): +        """ +        Get a default class for mocking the shared DB +        """ +        class defaultMockSharedDB(object): +            get_doc = Mock(return_value=None) +            put_doc = Mock(side_effect=put_doc_side_effect) +            lock = Mock(return_value=('atoken', 300)) +            unlock = Mock(return_value=True) + +            def __call__(self): +                return self +        return defaultMockSharedDB +      def _soledad_instance(self, user=ADDRESS, passphrase=u'123',                            prefix='',                            secrets_path=Soledad.STORAGE_SECRETS_FILE_NAME, @@ -88,18 +100,11 @@ class BaseSoledadTest(BaseLeapTest):          def _put_doc_side_effect(doc):              self._doc_put = doc -        class MockSharedDB(object): - -            get_doc = Mock(return_value=None) -            put_doc = Mock(side_effect=_put_doc_side_effect) -            lock = Mock(return_value=('atoken', 300)) -            unlock = Mock(return_value=True) - -            def __call__(self): -                return self -          if shared_db_class is not None:              MockSharedDB = shared_db_class +        else: +            MockSharedDB = self.get_default_shared_mock( +                _put_doc_side_effect)          Soledad._shared_db = MockSharedDB()          return Soledad( @@ -111,7 +116,8 @@ class BaseSoledadTest(BaseLeapTest):                  self.tempdir, prefix, local_db_path),              server_url=server_url,  # Soledad will fail if not given an url.              cert_file=cert_file, -            secret_id=secret_id) +            secret_id=secret_id, +            defer_encryption=self.defer_sync_encryption)      def assertGetEncryptedDoc(              self, db, doc_id, doc_rev, content, has_conflicts): @@ -121,8 +127,15 @@ class BaseSoledadTest(BaseLeapTest):          exp_doc = self.make_document(doc_id, doc_rev, content,                                       has_conflicts=has_conflicts)          doc = db.get_doc(doc_id) +          if ENC_SCHEME_KEY in doc.content: -            doc.set_json(decrypt_doc(self._soledad._crypto, doc)) +            # XXX check for SYM_KEY too +            key = self._soledad._crypto.doc_passphrase(doc.doc_id) +            secret = self._soledad._crypto.secret +            decrypted = decrypt_doc_dict( +                doc.content, doc.doc_id, doc.rev, +                key, secret) +            doc.set_json(decrypted)          self.assertEqual(exp_doc.doc_id, doc.doc_id)          self.assertEqual(exp_doc.rev, doc.rev)          self.assertEqual(exp_doc.has_conflicts, doc.has_conflicts) diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 3b1e5a06..10d6c136 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -91,14 +91,19 @@ class CouchDBWrapper(object):          logPath = os.path.join(self.tempdir, 'log', 'couch.log')          while not os.path.exists(logPath):              if self.process.poll() is not None: +                got_stdout, got_stderr = "", "" +                if self.process.stdout is not None: +                    got_stdout = self.process.stdout.read() + +                if self.process.stderr is not None: +                    got_stderr = self.process.stderr.read()                  raise Exception("""  couchdb exited with code %d.  stdout:  %s  stderr:  %s""" % ( -                    self.process.returncode, self.process.stdout.read(), -                    self.process.stderr.read())) +                    self.process.returncode, got_stdout, got_stderr))              time.sleep(0.01)          while os.stat(logPath).st_size == 0:              time.sleep(0.01) diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index b03f79e7..6465eb80 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*- -# test_soledad.py -# Copyright (C) 2013 LEAP +# test_couch_operations_atomicity.py +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,11 +14,9 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """ +Test atomocity for couch operations.  """ -  import os  import mock  import tempfile @@ -32,7 +30,7 @@ from leap.soledad.client import Soledad  from leap.soledad.common.couch import CouchDatabase, CouchServerState  from leap.soledad.common.tests.test_couch import CouchDBTestCase  from leap.soledad.common.tests.u1db_tests import TestCaseWithServer -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_sync_target import (      make_token_soledad_app,      make_leap_document_for_test,      token_leap_sync_target, @@ -224,9 +222,9 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):      #      # Concurrency tests      # -     +      class _WorkerThread(threading.Thread): -         +          def __init__(self, params, run_method):              threading.Thread.__init__(self)              self._params = params @@ -260,7 +258,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          for thread in threads:              thread.join() -         +          # assert length of transaction_log          transaction_log = self.db._get_transaction_log()          self.assertEqual( @@ -341,7 +339,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer):          # wait for threads to finish          for thread in threads:              thread.join() -         +          # do the sync!          sol.sync() diff --git a/common/src/leap/soledad/common/tests/test_crypto.py b/common/src/leap/soledad/common/tests/test_crypto.py index 4b2470ba..1071af14 100644 --- a/common/src/leap/soledad/common/tests/test_crypto.py +++ b/common/src/leap/soledad/common/tests/test_crypto.py @@ -14,37 +14,17 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Tests for cryptographic related stuff.  """ -  import os -import shutil -import tempfile -import simplejson as json  import hashlib  import binascii - -from leap.common.testing.basetest import BaseLeapTest -from leap.soledad.client import ( -    Soledad, -    crypto, -    target, -) +from leap.soledad.client import crypto  from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.tests import ( -    BaseSoledadTest, -    KEY_FINGERPRINT, -    PRIVATE_KEY, -) +from leap.soledad.common.tests import BaseSoledadTest  from leap.soledad.common.crypto import WrongMac, UnknownMacMethod -from leap.soledad.common.tests.u1db_tests import ( -    simple_doc, -    nested_doc, -)  class EncryptedSyncTestCase(BaseSoledadTest): @@ -59,16 +39,17 @@ class EncryptedSyncTestCase(BaseSoledadTest):          simpledoc = {'key': 'val'}          doc1 = SoledadDocument(doc_id='id')          doc1.content = simpledoc +          # encrypt doc -        doc1.set_json(target.encrypt_doc(self._soledad._crypto, doc1)) +        doc1.set_json(crypto.encrypt_doc(self._soledad._crypto, doc1))          # assert content is different and includes keys          self.assertNotEqual(              simpledoc, doc1.content,              'incorrect document encryption') -        self.assertTrue(target.ENC_JSON_KEY in doc1.content) -        self.assertTrue(target.ENC_SCHEME_KEY in doc1.content) +        self.assertTrue(crypto.ENC_JSON_KEY in doc1.content) +        self.assertTrue(crypto.ENC_SCHEME_KEY in doc1.content)          # decrypt doc -        doc1.set_json(target.decrypt_doc(self._soledad._crypto, doc1)) +        doc1.set_json(crypto.decrypt_doc(self._soledad._crypto, doc1))          self.assertEqual(              simpledoc, doc1.content, 'incorrect document encryption') @@ -159,15 +140,15 @@ class MacAuthTestCase(BaseSoledadTest):          doc = SoledadDocument(doc_id='id')          doc.content = simpledoc          # encrypt doc -        doc.set_json(target.encrypt_doc(self._soledad._crypto, doc)) -        self.assertTrue(target.MAC_KEY in doc.content) -        self.assertTrue(target.MAC_METHOD_KEY in doc.content) +        doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc)) +        self.assertTrue(crypto.MAC_KEY in doc.content) +        self.assertTrue(crypto.MAC_METHOD_KEY in doc.content)          # mess with MAC -        doc.content[target.MAC_KEY] = '1234567890ABCDEF' +        doc.content[crypto.MAC_KEY] = '1234567890ABCDEF'          # try to decrypt doc          self.assertRaises(              WrongMac, -            target.decrypt_doc, self._soledad._crypto, doc) +            crypto.decrypt_doc, self._soledad._crypto, doc)      def test_decrypt_with_unknown_mac_method_raises(self):          """ @@ -177,15 +158,15 @@ class MacAuthTestCase(BaseSoledadTest):          doc = SoledadDocument(doc_id='id')          doc.content = simpledoc          # encrypt doc -        doc.set_json(target.encrypt_doc(self._soledad._crypto, doc)) -        self.assertTrue(target.MAC_KEY in doc.content) -        self.assertTrue(target.MAC_METHOD_KEY in doc.content) +        doc.set_json(crypto.encrypt_doc(self._soledad._crypto, doc)) +        self.assertTrue(crypto.MAC_KEY in doc.content) +        self.assertTrue(crypto.MAC_METHOD_KEY in doc.content)          # mess with MAC method -        doc.content[target.MAC_METHOD_KEY] = 'mymac' +        doc.content[crypto.MAC_METHOD_KEY] = 'mymac'          # try to decrypt doc          self.assertRaises(              UnknownMacMethod, -            target.decrypt_doc, self._soledad._crypto, doc) +            crypto.decrypt_doc, self._soledad._crypto, doc)  class SoledadCryptoAESTestCase(BaseSoledadTest): diff --git a/common/src/leap/soledad/common/tests/test_http.py b/common/src/leap/soledad/common/tests/test_http.py new file mode 100644 index 00000000..d21470e0 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_http.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: test http database +""" +from u1db.remote import http_database + +from leap.soledad.client import auth + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_http_database + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_database`. +#----------------------------------------------------------------------------- + +class _HTTPDatabase(http_database.HTTPDatabase, auth.TokenBasedAuth): +    """ +    Wraps our token auth implementation. +    """ + +    def set_token_credentials(self, uuid, token): +        auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + +    def _sign_request(self, method, url_query, params): +        return auth.TokenBasedAuth._sign_request( +            self, method, url_query, params) + + +class TestHTTPDatabaseWithCreds( +        test_http_database.TestHTTPDatabaseCtrWithCreds): + +    def test_get_sync_target_inherits_token_credentials(self): +        # this test was from TestDatabaseSimpleOperations but we put it here +        # for convenience. +        self.db = _HTTPDatabase('dbase') +        self.db.set_token_credentials('user-uuid', 'auth-token') +        st = self.db.get_sync_target() +        self.assertEqual(self.db._creds, st._creds) + +    def test_ctr_with_creds(self): +        db1 = _HTTPDatabase('http://dbs/db', creds={'token': { +            'uuid': 'user-uuid', +            'token': 'auth-token', +        }}) +        self.assertIn('token', db1._creds) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_http_client.py b/common/src/leap/soledad/common/tests/test_http_client.py new file mode 100644 index 00000000..3169398b --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_http_client.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# test_http_client.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync target +""" +import json + +from u1db.remote import http_client + +from leap.soledad.client import auth +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_http_client +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_http_client`. +#----------------------------------------------------------------------------- + +class TestSoledadClientBase(test_http_client.TestHTTPClientBase): +    """ +    This class should be used to test Token auth. +    """ + +    def getClientWithToken(self, **kwds): +        self.startServer() + +        class _HTTPClientWithToken( +                http_client.HTTPClientBase, auth.TokenBasedAuth): + +            def set_token_credentials(self, uuid, token): +                auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + +            def _sign_request(self, method, url_query, params): +                return auth.TokenBasedAuth._sign_request( +                    self, method, url_query, params) + +        return _HTTPClientWithToken(self.getURL('dbase'), **kwds) + +    def test_oauth(self): +        """ +        Suppress oauth test (we test for token auth here). +        """ +        pass + +    def test_oauth_ctr_creds(self): +        """ +        Suppress oauth test (we test for token auth here). +        """ +        pass + +    def test_oauth_Unauthorized(self): +        """ +        Suppress oauth test (we test for token auth here). +        """ +        pass + +    def app(self, environ, start_response): +        res = test_http_client.TestHTTPClientBase.app( +            self, environ, start_response) +        if res is not None: +            return res +        # mime solead application here. +        if '/token' in environ['PATH_INFO']: +            auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY) +            if not auth: +                start_response("401 Unauthorized", +                               [('Content-Type', 'application/json')]) +                return [json.dumps({"error": "unauthorized", +                                    "message": e.message})] +            scheme, encoded = auth.split(None, 1) +            if scheme.lower() != 'token': +                start_response("401 Unauthorized", +                               [('Content-Type', 'application/json')]) +                return [json.dumps({"error": "unauthorized", +                                    "message": e.message})] +            uuid, token = encoded.decode('base64').split(':', 1) +            if uuid != 'user-uuid' and token != 'auth-token': +                return unauth_err("Incorrect address or token.") +            start_response("200 OK", [('Content-Type', 'application/json')]) +            return [json.dumps([environ['PATH_INFO'], uuid, token])] + +    def test_token(self): +        """ +        Test if token is sent correctly. +        """ +        cli = self.getClientWithToken() +        cli.set_token_credentials('user-uuid', 'auth-token') +        res, headers = cli._request('GET', ['doc', 'token']) +        self.assertEqual( +            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + +    def test_token_ctr_creds(self): +        cli = self.getClientWithToken(creds={'token': { +            'uuid': 'user-uuid', +            'token': 'auth-token', +        }}) +        res, headers = cli._request('GET', ['doc', 'token']) +        self.assertEqual( +            ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_https.py b/common/src/leap/soledad/common/tests/test_https.py new file mode 100644 index 00000000..b6288188 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_https.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: https +""" +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests import test_sync_target as test_st +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests.u1db_tests import test_backends +from leap.soledad.common.tests.u1db_tests import test_https + +from leap.soledad import client +from leap.soledad.server import SoledadApp + +from u1db.remote import http_client + + +def make_soledad_app(state): +    return SoledadApp(state) + +LEAP_SCENARIOS = [ +    ('http', { +        'make_database_for_test': test_backends.make_http_database_for_test, +        'copy_database_for_test': test_backends.copy_http_database_for_test, +        'make_document_for_test': test_st.make_leap_document_for_test, +        'make_app_with_state': test_st.make_soledad_app}), +] + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_https`. +#----------------------------------------------------------------------------- + +def token_leap_https_sync_target(test, host, path): +    _, port = test.server.server_address +    st = client.target.SoledadSyncTarget( +        'https://%s:%d/%s' % (host, port, path), +        crypto=test._soledad._crypto) +    st.set_token_credentials('user-uuid', 'auth-token') +    return st + + +class TestSoledadSyncTargetHttpsSupport( +        test_https.TestHttpSyncTargetHttpsSupport, +        BaseSoledadTest): + +    scenarios = [ +        ('token_soledad_https', +            {'server_def': test_https.https_server_def, +             'make_app_with_state': test_st.make_token_soledad_app, +             'make_document_for_test': test_st.make_leap_document_for_test, +             'sync_target': token_leap_https_sync_target}), +    ] + +    def setUp(self): +        # the parent constructor undoes our SSL monkey patch to ensure tests +        # run smoothly with standard u1db. +        test_https.TestHttpSyncTargetHttpsSupport.setUp(self) +        # so here monkey patch again to test our functionality. +        http_client._VerifiedHTTPSConnection = client.VerifiedHTTPSConnection +        client.SOLEDAD_CERT = http_client.CA_CERTS + +    def test_working(self): +        """ +        Test that SSL connections work well. + +        This test was adapted to patch Soledad's HTTPS connection custom class +        with the intended CA certificates. +        """ +        self.startServer() +        db = self.request_state._create_database('test') +        self.patch(client, 'SOLEDAD_CERT', self.cacert_pem) +        remote_target = self.getSyncTarget('localhost', 'test') +        remote_target.record_sync_info('other-id', 2, 'T-id') +        self.assertEqual( +            (2, 'T-id'), db._get_replica_gen_and_trans_id('other-id')) + +    def test_host_mismatch(self): +        """ +        Test that SSL connections to a hostname different than the one in the +        certificate raise CertificateError. + +        This test was adapted to patch Soledad's HTTPS connection custom class +        with the intended CA certificates. +        """ +        self.startServer() +        self.request_state._create_database('test') +        self.patch(client, 'SOLEDAD_CERT', self.cacert_pem) +        remote_target = self.getSyncTarget('127.0.0.1', 'test') +        self.assertRaises( +            http_client.CertificateError, remote_target.record_sync_info, +            'other-id', 2, 'T-id') + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 1c5a7407..cb5348b4 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -14,12 +14,9 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Tests for server-related functionality.  """ -  import os  import tempfile  import simplejson as json @@ -39,16 +36,13 @@ from leap.soledad.common.tests.u1db_tests import (      simple_doc,  )  from leap.soledad.common.tests.test_couch import CouchDBTestCase -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_target_soledad import (      make_token_soledad_app,      make_leap_document_for_test, -    token_leap_sync_target,  ) -from leap.soledad.client import ( -    Soledad, -    target, -) -from leap.soledad.server import SoledadApp, LockResource +from leap.soledad.common.tests.test_sync_target import token_leap_sync_target +from leap.soledad.client import Soledad, crypto +from leap.soledad.server import LockResource  from leap.soledad.server.auth import URLToAuthorization @@ -369,12 +363,12 @@ class EncryptedSyncTestCase(          self.assertEqual(doc1.doc_id, couchdoc.doc_id)          self.assertEqual(doc1.rev, couchdoc.rev)          self.assertEqual(6, len(couchdoc.content)) -        self.assertTrue(target.ENC_JSON_KEY in couchdoc.content) -        self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content) -        self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content) -        self.assertTrue(target.ENC_IV_KEY in couchdoc.content) -        self.assertTrue(target.MAC_KEY in couchdoc.content) -        self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content) +        self.assertTrue(crypto.MAC_KEY in couchdoc.content) +        self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content)          # instantiate soledad with empty db, but with same secrets path          sol2 = self._soledad_instance(prefix='x', auth_token='auth-token')          _, doclist = sol2.get_all_docs() @@ -427,12 +421,12 @@ class EncryptedSyncTestCase(          self.assertEqual(doc1.doc_id, couchdoc.doc_id)          self.assertEqual(doc1.rev, couchdoc.rev)          self.assertEqual(6, len(couchdoc.content)) -        self.assertTrue(target.ENC_JSON_KEY in couchdoc.content) -        self.assertTrue(target.ENC_SCHEME_KEY in couchdoc.content) -        self.assertTrue(target.ENC_METHOD_KEY in couchdoc.content) -        self.assertTrue(target.ENC_IV_KEY in couchdoc.content) -        self.assertTrue(target.MAC_KEY in couchdoc.content) -        self.assertTrue(target.MAC_METHOD_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_JSON_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_SCHEME_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_METHOD_KEY in couchdoc.content) +        self.assertTrue(crypto.ENC_IV_KEY in couchdoc.content) +        self.assertTrue(crypto.MAC_KEY in couchdoc.content) +        self.assertTrue(crypto.MAC_METHOD_KEY in couchdoc.content)          # instantiate soledad with empty db, but with same secrets path          sol2 = self._soledad_instance(              prefix='x', @@ -502,7 +496,6 @@ class EncryptedSyncTestCase(          sol1.close()          sol2.close() -      def test_sync_many_small_files(self):          """          Test if Soledad can sync many smallfiles. @@ -548,6 +541,7 @@ class EncryptedSyncTestCase(          sol1.close()          sol2.close() +  class LockResourceTestCase(          CouchDBTestCase, TestCaseWithServer):      """ diff --git a/common/src/leap/soledad/common/tests/test_soledad.py b/common/src/leap/soledad/common/tests/test_soledad.py index 5a3bf2b0..11e43423 100644 --- a/common/src/leap/soledad/common/tests/test_soledad.py +++ b/common/src/leap/soledad/common/tests/test_soledad.py @@ -14,18 +14,13 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Tests for general Soledad functionality.  """ - -  import os  from mock import Mock -from pysqlcipher.dbapi2 import DatabaseError  from leap.common.events import events_pb2 as proto  from leap.soledad.common.tests import (      BaseSoledadTest, diff --git a/common/src/leap/soledad/common/tests/test_soledad_doc.py b/common/src/leap/soledad/common/tests/test_soledad_doc.py new file mode 100644 index 00000000..0952de6d --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_soledad_doc.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# test_soledad_doc.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: soledad docs +""" +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.u1db_tests import test_document +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import test_sync_target as st + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_document`. +#----------------------------------------------------------------------------- + + +class TestSoledadDocument(test_document.TestDocument, BaseSoledadTest): + +    scenarios = ([( +        'leap', { +            'make_document_for_test': st.make_leap_document_for_test})]) + + +class TestSoledadPyDocument(test_document.TestPyDocument, BaseSoledadTest): + +    scenarios = ([( +        'leap', { +            'make_document_for_test': st.make_leap_document_for_test})]) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sqlcipher.py b/common/src/leap/soledad/common/tests/test_sqlcipher.py index 891aca0f..595966ec 100644 --- a/common/src/leap/soledad/common/tests/test_sqlcipher.py +++ b/common/src/leap/soledad/common/tests/test_sqlcipher.py @@ -14,16 +14,11 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Test sqlcipher backend internals.  """ - -  import os  import time -import unittest  import simplejson as json  import threading @@ -50,15 +45,9 @@ from leap.soledad.client.sqlcipher import (      DatabaseIsNotEncrypted,      open as u1db_open,  ) -from leap.soledad.common.crypto import ( -    EncryptionSchemes, -    ENC_JSON_KEY, -    ENC_SCHEME_KEY, -) -from leap.soledad.client.target import ( -    decrypt_doc, -    SoledadSyncTarget, -) +from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.common.crypto import ENC_SCHEME_KEY +from leap.soledad.client.crypto import decrypt_doc_dict  # u1db tests stuff. @@ -269,6 +258,7 @@ class TestSQLCipherPartialExpandDatabase(          db = SQLCipherDatabase.__new__(              SQLCipherDatabase)          db._db_handle = dbapi2.connect(path)  # db is there but not yet init-ed +        db._syncers = {}          c = db._db_handle.cursor()          c.execute('PRAGMA key="%s"' % PASSWORD)          self.addCleanup(db.close) @@ -614,7 +604,12 @@ class SQLCipherDatabaseSyncTests(          self.sync(self.db2, db3)          doc3 = db3.get_doc('the-doc')          if ENC_SCHEME_KEY in doc3.content: -            doc3.set_json(decrypt_doc(self._soledad._crypto, doc3)) +            _crypto = self._soledad._crypto +            key = _crypto.doc_passphrase(doc3.doc_id) +            secret = _crypto.secret +            doc3.set_json(decrypt_doc_dict( +                doc3.content, +                doc3.doc_id, doc3.rev, key, secret))          self.assertEqual(doc4.get_json(), doc3.get_json())          self.assertFalse(doc3.has_conflicts) @@ -796,7 +791,7 @@ class SQLCipherEncryptionTest(BaseLeapTest):              # trying to open the a non-encrypted database with sqlcipher              # backend should raise a DatabaseIsNotEncrypted exception.              SQLCipherDatabase(self.DB_FILE, PASSWORD) -            raise db1pi2.DatabaseError( +            raise dbapi2.DatabaseError(                  "SQLCipher backend should not be able to open non-encrypted "                  "dbs.")          except DatabaseIsNotEncrypted: diff --git a/common/src/leap/soledad/common/tests/test_sync.py b/common/src/leap/soledad/common/tests/test_sync.py index fd4a2797..0433fac9 100644 --- a/common/src/leap/soledad/common/tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/test_sync.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # test_sync.py -# Copyright (C) 2014 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -24,26 +24,31 @@ import threading  import time  from urlparse import urljoin -from leap.soledad.common.couch import ( -    CouchServerState, -    CouchDatabase, -) +from leap.soledad.common import couch +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests import test_sync_target +from leap.soledad.common.tests import u1db_tests as tests  from leap.soledad.common.tests.u1db_tests import (      TestCaseWithServer,      simple_doc, +    test_backends, +    test_sync  )  from leap.soledad.common.tests.test_couch import CouchDBTestCase -from leap.soledad.common.tests.test_target import ( +from leap.soledad.common.tests.test_target_soledad import (      make_token_soledad_app,      make_leap_document_for_test, -    token_leap_sync_target,  ) - +from leap.soledad.common.tests.test_sync_target import token_leap_sync_target  from leap.soledad.client import (      Soledad,      target,  ) +from leap.soledad.common.tests.util import SoledadWithCouchServerMixin +from leap.soledad.client.sync import SoledadSynchronizer +from leap.soledad.server import SoledadApp +  class InterruptableSyncTestCase( @@ -99,8 +104,8 @@ class InterruptableSyncTestCase(              secret_id=secret_id)      def make_app(self): -        self.request_state = CouchServerState(self._couch_url, 'shared', -                                              'tokens') +        self.request_state = couch.CouchServerState( +            self._couch_url, 'shared', 'tokens')          return self.make_app_with_state(self.request_state)      def setUp(self): @@ -150,7 +155,7 @@ class InterruptableSyncTestCase(              sol.create_doc(json.loads(simple_doc))          # ensure remote db exists before syncing -        db = CouchDatabase.open_database( +        db = couch.CouchDatabase.open_database(              urljoin(self._couch_url, 'user-user-uuid'),              create=True,              ensure_ddocs=True) @@ -174,3 +179,114 @@ class InterruptableSyncTestCase(          db.delete_database()          db.close()          sol.close() + + +def make_soledad_app(state): +    return SoledadApp(state) + + +class TestSoledadDbSync( +        SoledadWithCouchServerMixin, +        test_sync.TestDbSync): +    """ +    Test db.sync remote sync shortcut +    """ + +    scenarios = [ +        ('py-http', { +            'make_app_with_state': make_soledad_app, +            'make_database_for_test': tests.make_memory_database_for_test, +        }), +        ('py-token-http', { +            'make_app_with_state': test_sync_target.make_token_soledad_app, +            'make_database_for_test': tests.make_memory_database_for_test, +            'token': True +        }), +    ] + +    oauth = False +    token = False + +    def setUp(self): +        """ +        Need to explicitely invoke inicialization on all bases. +        """ +        tests.TestCaseWithServer.setUp(self) +        self.main_test_class = test_sync.TestDbSync +        SoledadWithCouchServerMixin.setUp(self) +        self.startServer() +        self.db2 = couch.CouchDatabase.open_database( +            urljoin( +                'http://localhost:' + str(self.wrapper.port), 'test'), +                create=True, +                ensure_ddocs=True) + +    def tearDown(self): +        """ +        Need to explicitely invoke destruction on all bases. +        """ +        self.db2.delete_database() +        SoledadWithCouchServerMixin.tearDown(self) +        tests.TestCaseWithServer.tearDown(self) + +    def do_sync(self, target_name): +        """ +        Perform sync using SoledadSynchronizer, SoledadSyncTarget +        and Token auth. +        """ +        extra = {} +        extra = dict(creds={'token': { +            'uuid': 'user-uuid', +            'token': 'auth-token', +        }}) +        target_url = self.getURL(target_name) +        return SoledadSynchronizer( +            self.db, +            target.SoledadSyncTarget( +                target_url, +                crypto=self._soledad._crypto, +                **extra)).sync(autocreate=True, +                               defer_decryption=False) + +    def test_db_sync(self): +        """ +        Test sync. + +        Adapted to check for encrypted content. +        """ +        doc1 = self.db.create_doc_from_json(tests.simple_doc) +        doc2 = self.db2.create_doc_from_json(tests.nested_doc) +        local_gen_before_sync = self.do_sync('test') +        gen, _, changes = self.db.whats_changed(local_gen_before_sync) +        self.assertEqual(1, len(changes)) +        self.assertEqual(doc2.doc_id, changes[0][0]) +        self.assertEqual(1, gen - local_gen_before_sync) +        self.assertGetEncryptedDoc( +            self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) +        self.assertGetEncryptedDoc( +            self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) + +    def test_db_sync_autocreate(self): +        """ +        Test sync. + +        Adapted to check for encrypted content. +        """ +        doc1 = self.db.create_doc_from_json(tests.simple_doc) +        local_gen_before_sync = self.do_sync('test') +        gen, _, changes = self.db.whats_changed(local_gen_before_sync) +        self.assertEqual(0, gen - local_gen_before_sync) +        db3 = self.request_state.open_database('test') +        gen, _, changes = db3.whats_changed() +        self.assertEqual(1, len(changes)) +        self.assertEqual(doc1.doc_id, changes[0][0]) +        self.assertGetEncryptedDoc( +            db3, doc1.doc_id, doc1.rev, tests.simple_doc, False) +        t_gen, _ = self.db._get_replica_gen_and_trans_id( +            db3.replica_uid) +        s_gen, _ = db3._get_replica_gen_and_trans_id('test1') +        self.assertEqual(1, t_gen) +        self.assertEqual(1, s_gen) + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sync_deferred.py b/common/src/leap/soledad/common/tests/test_sync_deferred.py new file mode 100644 index 00000000..48e3150f --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync_deferred.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +# test_sync_deferred.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync with deferred encryption/decryption. +""" +import time +import os +import random +import string +from urlparse import urljoin + +from leap.soledad.common.tests import u1db_tests as tests, ADDRESS +from leap.soledad.common.tests.u1db_tests import test_sync + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import couch +from leap.soledad.client import target +from leap.soledad.client.sync import SoledadSynchronizer + +# Just to make clear how this test is different... :) +DEFER_DECRYPTION = True + +WAIT_STEP = 1 +MAX_WAIT = 10 + +from leap.soledad.common.tests import test_sqlcipher as ts +from leap.soledad.server import SoledadApp + + +from leap.soledad.client.sqlcipher import open as open_sqlcipher +from leap.soledad.common.tests.util import SoledadWithCouchServerMixin +from leap.soledad.common.tests.util import make_soledad_app + + +DBPASS = "pass" + + +class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): +    """ +    Another base class for testing the deferred encryption/decryption during +    the syncs, using the intermediate database. +    """ +    defer_sync_encryption = True + +    def setUp(self): +        # config info +        self.db1_file = os.path.join(self.tempdir, "db1.u1db") +        self.db_pass = DBPASS +        self.email = ADDRESS + +        # get a random prefix for each test, so we do not mess with +        # concurrency during initialization and shutting down of +        # each local db. +        self.rand_prefix = ''.join( +            map(lambda x: random.choice(string.ascii_letters), range(6))) +        # initialize soledad by hand so we can control keys +        self._soledad = self._soledad_instance( +            prefix=self.rand_prefix, user=self.email) + +        # open test dbs: db1 will be the local sqlcipher db +        # (which instantiates a syncdb) +        self.db1 = open_sqlcipher(self.db1_file, DBPASS, create=True, +                                  document_factory=SoledadDocument, +                                  crypto=self._soledad._crypto, +                                  defer_encryption=True) +        self.db2 = couch.CouchDatabase.open_database( +            urljoin( +                'http://localhost:' + str(self.wrapper.port), 'test'), +                create=True, +                ensure_ddocs=True) + +    def tearDown(self): +        self.db1.close() +        self.db2.close() +        self._soledad.close() + +        # XXX should not access "private" attrs +        for f in [self._soledad._local_db_path, +                  self._soledad._secrets_path, +                  self.db1._sync_db_path]: +            if os.path.isfile(f): +                os.unlink(f) + + +#SQLCIPHER_SCENARIOS = [ +#    ('http', { +#        #'make_app_with_state': test_sync_target.make_token_soledad_app, +#        'make_app_with_state': make_soledad_app, +#        'make_database_for_test': ts.make_sqlcipher_database_for_test, +#        'copy_database_for_test': ts.copy_sqlcipher_database_for_test, +#        'make_document_for_test': ts.make_document_for_test, +#        'token': True +#        }), +#] + + +class SyncTimeoutError(Exception): +    """ +    Dummy exception to notify timeout during sync. +    """ +    pass + + +class TestSoledadDbSyncDeferredEncDecr( +        BaseSoledadDeferredEncTest, +        test_sync.TestDbSync): +    """ +    Test db.sync remote sync shortcut. +    Case with deferred encryption and decryption: using the intermediate +    syncdb. +    """ + +    scenarios = [ +        ('http', { +            'make_app_with_state': make_soledad_app, +            'make_database_for_test': tests.make_memory_database_for_test, +        }), +    ] + +    oauth = False +    token = True + +    def setUp(self): +        """ +        Need to explicitely invoke inicialization on all bases. +        """ +        tests.TestCaseWithServer.setUp(self) +        self.main_test_class = test_sync.TestDbSync +        BaseSoledadDeferredEncTest.setUp(self) +        self.startServer() +        self.syncer = None + +    def tearDown(self): +        """ +        Need to explicitely invoke destruction on all bases. +        """ +        BaseSoledadDeferredEncTest.tearDown(self) +        tests.TestCaseWithServer.tearDown(self) + +    def do_sync(self, target_name): +        """ +        Perform sync using SoledadSynchronizer, SoledadSyncTarget +        and Token auth. +        """ +        if self.token: +            extra = dict(creds={'token': { +                'uuid': 'user-uuid', +                'token': 'auth-token', +            }}) +            target_url = self.getURL(target_name) +            syncdb = getattr(self.db1, "_sync_db", None) + +            syncer = SoledadSynchronizer( +                self.db1, +                target.SoledadSyncTarget( +                    target_url, +                    crypto=self._soledad._crypto, +                    sync_db=syncdb, +                    **extra)) +            # Keep a reference to be able to know when the sync +            # has finished. +            self.syncer = syncer +            return syncer.sync( +                autocreate=True, defer_decryption=DEFER_DECRYPTION) +        else: +            return test_sync.TestDbSync.do_sync(self, target_name) + +    def wait_for_sync(self): +        """ +        Wait for sync to finish. +        """ +        wait = 0 +        syncer = self.syncer +        if syncer is not None: +            while syncer.syncing: +                time.sleep(WAIT_STEP) +                wait += WAIT_STEP +                if wait >= MAX_WAIT: +                    raise SyncTimeoutError + +    def test_db_sync(self): +        """ +        Test sync. + +        Adapted to check for encrypted content. +        """ +        doc1 = self.db1.create_doc_from_json(tests.simple_doc) +        doc2 = self.db2.create_doc_from_json(tests.nested_doc) + +        import time +        # need to give time to the encryption to proceed +        # TODO should implement a defer list to subscribe to the all-decrypted +        # event +        time.sleep(2) + +        local_gen_before_sync = self.do_sync('test') +        self.wait_for_sync() + +        gen, _, changes = self.db1.whats_changed(local_gen_before_sync) +        self.assertEqual(1, len(changes)) + +        self.assertEqual(doc2.doc_id, changes[0][0]) +        self.assertEqual(1, gen - local_gen_before_sync) + +        self.assertGetEncryptedDoc( +            self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) +        self.assertGetEncryptedDoc( +            self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False) + +    def test_db_sync_autocreate(self): +        pass + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_sync_target.py b/common/src/leap/soledad/common/tests/test_sync_target.py new file mode 100644 index 00000000..edc4589b --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_sync_target.py @@ -0,0 +1,589 @@ +# -*- coding: utf-8 -*- +# test_sync_target.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test Leap backend bits: sync target +""" +import cStringIO +import os + +import simplejson as json +import u1db + +from uuid import uuid4 + +from u1db.remote import http_database + +from u1db import SyncTarget +from u1db.sync import Synchronizer +from u1db.remote import ( +    http_client, +    http_database, +    http_target, +) + +from leap.soledad import client +from leap.soledad.client import ( +    target, +    auth, +    crypto, +    VerifiedHTTPSConnection, +    sync, +) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.util import ( +    make_sqlcipher_database_for_test, +    make_soledad_app, +    make_token_soledad_app, +    SoledadWithCouchServerMixin, +) +from leap.soledad.common.tests.u1db_tests import test_backends +from leap.soledad.common.tests.u1db_tests import test_remote_sync_target +from leap.soledad.common.tests.u1db_tests import test_sync +from leap.soledad.common.tests.test_couch import ( +    CouchDBTestCase, +    CouchDBWrapper, +) + +from leap.soledad.server import SoledadApp +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_leap_document_for_test(test, doc_id, rev, content, +                                has_conflicts=False): +    return SoledadDocument( +        doc_id, rev, content, has_conflicts=has_conflicts) + + +LEAP_SCENARIOS = [ +    ('http', { +        'make_database_for_test': test_backends.make_http_database_for_test, +        'copy_database_for_test': test_backends.copy_http_database_for_test, +        'make_document_for_test': make_leap_document_for_test, +        'make_app_with_state': make_soledad_app}), +] + + +def make_token_http_database_for_test(test, replica_uid): +    test.startServer() +    test.request_state._create_database(replica_uid) + +    class _HTTPDatabaseWithToken( +            http_database.HTTPDatabase, auth.TokenBasedAuth): + +        def set_token_credentials(self, uuid, token): +            auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + +        def _sign_request(self, method, url_query, params): +            return auth.TokenBasedAuth._sign_request( +                self, method, url_query, params) + +    http_db = _HTTPDatabaseWithToken(test.getURL('test')) +    http_db.set_token_credentials('user-uuid', 'auth-token') +    return http_db + + +def copy_token_http_database_for_test(test, db): +    # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS +    # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE +    # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN +    # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR +    # HOUSE. +    http_db = test.request_state._copy_database(db) +    http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') +    return http_db + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_remote_sync_target`. +#----------------------------------------------------------------------------- + +class TestSoledadSyncTargetBasics( +        test_remote_sync_target.TestHTTPSyncTargetBasics): +    """ +    Some tests had to be copied to this class so we can instantiate our own +    target. +    """ + +    def test_parse_url(self): +        remote_target = target.SoledadSyncTarget('http://127.0.0.1:12345/') +        self.assertEqual('http', remote_target._url.scheme) +        self.assertEqual('127.0.0.1', remote_target._url.hostname) +        self.assertEqual(12345, remote_target._url.port) +        self.assertEqual('/', remote_target._url.path) + + +class TestSoledadParsingSyncStream( +        test_remote_sync_target.TestParsingSyncStream, +        BaseSoledadTest): +    """ +    Some tests had to be copied to this class so we can instantiate our own +    target. +    """ + +    def setUp(self): +        test_remote_sync_target.TestParsingSyncStream.setUp(self) + +    def tearDown(self): +        test_remote_sync_target.TestParsingSyncStream.tearDown(self) + +    def test_extra_comma(self): +        """ +        Test adapted to use encrypted content. +        """ +        doc = SoledadDocument('i', rev='r') +        doc.content = {} +        _crypto = self._soledad._crypto +        key = _crypto.doc_passphrase(doc.doc_id) +        secret = _crypto.secret + +        enc_json = crypto.encrypt_docstr( +            doc.get_json(), doc.doc_id, doc.rev, +            key, secret) +        tgt = target.SoledadSyncTarget( +            "http://foo/foo", crypto=self._soledad._crypto) + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "[\r\n{},\r\n]", None) +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, +                          '[\r\n{},\r\n{"id": "i", "rev": "r", ' +                          '"content": %s, "gen": 3, "trans_id": "T-sid"}' +                          ',\r\n]' % json.dumps(enc_json), +                          lambda doc, gen, trans_id: None) + +    def test_wrong_start(self): +        tgt = target.SoledadSyncTarget("http://foo/foo") + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "{}\r\n]", None) + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "\r\n{}\r\n]", None) + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "", None) + +    def test_wrong_end(self): +        tgt = target.SoledadSyncTarget("http://foo/foo") + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "[\r\n{}", None) + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "[\r\n", None) + +    def test_missing_comma(self): +        tgt = target.SoledadSyncTarget("http://foo/foo") + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, +                          '[\r\n{}\r\n{"id": "i", "rev": "r", ' +                          '"content": "c", "gen": 3}\r\n]', None) + +    def test_no_entries(self): +        tgt = target.SoledadSyncTarget("http://foo/foo") + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, "[\r\n]", None) + +    def test_error_in_stream(self): +        tgt = target.SoledadSyncTarget("http://foo/foo") + +        self.assertRaises(u1db.errors.Unavailable, +                          tgt._parse_sync_stream, +                          '[\r\n{"new_generation": 0},' +                          '\r\n{"error": "unavailable"}\r\n', None) + +        self.assertRaises(u1db.errors.Unavailable, +                          tgt._parse_sync_stream, +                          '[\r\n{"error": "unavailable"}\r\n', None) + +        self.assertRaises(u1db.errors.BrokenSyncStream, +                          tgt._parse_sync_stream, +                          '[\r\n{"error": "?"}\r\n', None) + + +# +# functions for TestRemoteSyncTargets +# + +def leap_sync_target(test, path): +    return target.SoledadSyncTarget( +        test.getURL(path), crypto=test._soledad._crypto) + + +def token_leap_sync_target(test, path): +    st = leap_sync_target(test, path) +    st.set_token_credentials('user-uuid', 'auth-token') +    return st + + +def make_local_db_and_soledad_target(test, path='test'): +    test.startServer() +    db = test.request_state._create_database(os.path.basename(path)) +    st = target.SoledadSyncTarget.connect( +        test.getURL(path), crypto=test._soledad._crypto) +    return db, st + + +def make_local_db_and_token_soledad_target(test): +    db, st = make_local_db_and_soledad_target(test, 'test') +    st.set_token_credentials('user-uuid', 'auth-token') +    return db, st + + +class TestSoledadSyncTarget( +        SoledadWithCouchServerMixin, +        test_remote_sync_target.TestRemoteSyncTargets): + +    scenarios = [ +        ('token_soledad', +            {'make_app_with_state': make_token_soledad_app, +             'make_document_for_test': make_leap_document_for_test, +             'create_db_and_target': make_local_db_and_token_soledad_target, +             'make_database_for_test': make_sqlcipher_database_for_test, +             'sync_target': token_leap_sync_target}), +    ] + +    def setUp(self): +        tests.TestCaseWithServer.setUp(self) +        self.main_test_class = test_remote_sync_target.TestRemoteSyncTargets +        SoledadWithCouchServerMixin.setUp(self) +        self.startServer() +        self.db1 = make_sqlcipher_database_for_test(self, 'test1') +        self.db2 = self.request_state._create_database('test2') + +    def tearDown(self): +        SoledadWithCouchServerMixin.tearDown(self) +        tests.TestCaseWithServer.tearDown(self) +        db, _ = self.request_state.ensure_database('test2') +        db.delete_database() + +    def test_sync_exchange_send(self): +        """ +        Test for sync exchanging send of document. + +        This test was adapted to decrypt remote content before assert. +        """ +        self.startServer() +        db = self.request_state._create_database('test') +        remote_target = self.getSyncTarget('test') +        other_docs = [] + +        def receive_doc(doc, gen, trans_id): +            other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + +        doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        new_gen, trans_id = remote_target.sync_exchange( +            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=receive_doc, +            defer_decryption=False) +        self.assertEqual(1, new_gen) +        self.assertGetEncryptedDoc( +            db, 'doc-here', 'replica:1', '{"value": "here"}', False) + +    def test_sync_exchange_send_failure_and_retry_scenario(self): +        """ +        Test for sync exchange failure and retry. + +        This test was adapted to: +          - decrypt remote content before assert. +          - not expect a bounced document because soledad has stateful +            recoverable sync. +        """ + +        self.startServer() + +        def blackhole_getstderr(inst): +            return cStringIO.StringIO() + +        self.patch(self.server.RequestHandlerClass, 'get_stderr', +                   blackhole_getstderr) +        db = self.request_state._create_database('test') +        _put_doc_if_newer = db._put_doc_if_newer +        trigger_ids = ['doc-here2'] + +        def bomb_put_doc_if_newer(self, doc, save_conflict, +                                  replica_uid=None, replica_gen=None, +                                  replica_trans_id=None, number_of_docs=None, +                                  doc_idx=None, sync_id=None): +            if doc.doc_id in trigger_ids: +                raise Exception +            return _put_doc_if_newer(doc, save_conflict=save_conflict, +                                     replica_uid=replica_uid, +                                     replica_gen=replica_gen, +                                     replica_trans_id=replica_trans_id, +                                     number_of_docs=number_of_docs, +                                     doc_idx=doc_idx, sync_id=sync_id) +        from leap.soledad.common.tests.test_couch import IndexedCouchDatabase +        self.patch( +            IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) +        remote_target = self.getSyncTarget('test') +        other_changes = [] + +        def receive_doc(doc, gen, trans_id): +            other_changes.append( +                (doc.doc_id, doc.rev, doc.get_json(), gen, trans_id)) + +        doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        doc2 = self.make_document('doc-here2', 'replica:1', +                                  '{"value": "here2"}') + +        # we do not expect an HTTPError because soledad sync fails gracefully +        remote_target.sync_exchange( +            [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], +            'replica', last_known_generation=0, last_known_trans_id=None, +            return_doc_cb=receive_doc) +        self.assertGetEncryptedDoc( +            db, 'doc-here', 'replica:1', '{"value": "here"}', +            False) +        self.assertEqual( +            (10, 'T-sid'), db._get_replica_gen_and_trans_id('replica')) +        self.assertEqual([], other_changes) +        # retry +        trigger_ids = [] +        new_gen, trans_id = remote_target.sync_exchange( +            [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=receive_doc) +        self.assertGetEncryptedDoc( +            db, 'doc-here2', 'replica:1', '{"value": "here2"}', +            False) +        self.assertEqual( +            (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica')) +        self.assertEqual(2, new_gen) +        self.assertEqual( +            ('doc-here', 'replica:1', '{"value": "here"}', 1), +            other_changes[0][:-1]) + +    def test_sync_exchange_send_ensure_callback(self): +        """ +        Test for sync exchange failure and retry. + +        This test was adapted to decrypt remote content before assert. +        """ +        self.startServer() +        remote_target = self.getSyncTarget('test') +        other_docs = [] +        replica_uid_box = [] + +        def receive_doc(doc, gen, trans_id): +            other_docs.append((doc.doc_id, doc.rev, doc.get_json())) + +        def ensure_cb(replica_uid): +            replica_uid_box.append(replica_uid) + +        doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        new_gen, trans_id = remote_target.sync_exchange( +            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=receive_doc, +            ensure_callback=ensure_cb, defer_decryption=False) +        self.assertEqual(1, new_gen) +        db = self.request_state.open_database('test') +        self.assertEqual(1, len(replica_uid_box)) +        self.assertEqual(db._replica_uid, replica_uid_box[0]) +        self.assertGetEncryptedDoc( +            db, 'doc-here', 'replica:1', '{"value": "here"}', False) + +    def test_sync_exchange_in_stream_error(self): +        # we bypass this test because our sync_exchange process does not +        # return u1db error 503 "unavailable" for now. +        pass + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_sync`. +#----------------------------------------------------------------------------- + +target_scenarios = [ +    ('token_leap', {'create_db_and_target': +                    make_local_db_and_token_soledad_target, +                    'make_app_with_state': make_soledad_app}), +] + + +class SoledadDatabaseSyncTargetTests( +        SoledadWithCouchServerMixin, test_sync.DatabaseSyncTargetTests): + +    scenarios = ( +        tests.multiply_scenarios( +            tests.DatabaseBaseTests.scenarios, +            target_scenarios)) + +    whitebox = False + +    def setUp(self): +        self.main_test_class = test_sync.DatabaseSyncTargetTests +        SoledadWithCouchServerMixin.setUp(self) + +    def test_sync_exchange(self): +        """ +        Test sync exchange. + +        This test was adapted to decrypt remote content before assert. +        """ +        sol, _ = make_local_db_and_soledad_target(self) +        docs_by_gen = [ +            (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, +             'T-sid')] +        new_gen, trans_id = self.st.sync_exchange( +            docs_by_gen, 'replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=self.receive_doc, +            defer_decryption=False) +        self.assertGetEncryptedDoc( +            self.db, 'doc-id', 'replica:1', tests.simple_doc, False) +        self.assertTransactionLog(['doc-id'], self.db) +        last_trans_id = self.getLastTransId(self.db) +        self.assertEqual(([], 1, last_trans_id), +                         (self.other_changes, new_gen, last_trans_id)) +        self.assertEqual(10, self.st.get_sync_info('replica')[3]) +        sol.close() + +    def test_sync_exchange_push_many(self): +        """ +        Test sync exchange. + +        This test was adapted to decrypt remote content before assert. +        """ +        docs_by_gen = [ +            (self.make_document( +                'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), +            (self.make_document( +                'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] +        new_gen, trans_id = self.st.sync_exchange( +            docs_by_gen, 'replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=self.receive_doc, +            defer_decryption=False) +        self.assertGetEncryptedDoc( +            self.db, 'doc-id', 'replica:1', tests.simple_doc, False) +        self.assertGetEncryptedDoc( +            self.db, 'doc-id2', 'replica:1', tests.nested_doc, False) +        self.assertTransactionLog(['doc-id', 'doc-id2'], self.db) +        last_trans_id = self.getLastTransId(self.db) +        self.assertEqual(([], 2, last_trans_id), +                         (self.other_changes, new_gen, trans_id)) +        self.assertEqual(11, self.st.get_sync_info('replica')[3]) + +    def test_sync_exchange_returns_many_new_docs(self): +        """ +        Test sync exchange. + +        This test was adapted to avoid JSON serialization comparison as local +        and remote representations might differ. It looks directly at the +        doc's contents instead. +        """ +        doc = self.db.create_doc_from_json(tests.simple_doc) +        doc2 = self.db.create_doc_from_json(tests.nested_doc) +        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) +        new_gen, _ = self.st.sync_exchange( +            [], 'other-replica', last_known_generation=0, +            last_known_trans_id=None, return_doc_cb=self.receive_doc, +            defer_decryption=False) +        self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) +        self.assertEqual(2, new_gen) +        self.assertEqual( +            [(doc.doc_id, doc.rev, 1), +             (doc2.doc_id, doc2.rev, 2)], +            [c[:-3] + c[-2:-1] for c in self.other_changes]) +        self.assertEqual( +            json.loads(tests.simple_doc), +            json.loads(self.other_changes[0][2])) +        self.assertEqual( +            json.loads(tests.nested_doc), +            json.loads(self.other_changes[1][2])) +        if self.whitebox: +            self.assertEqual( +                self.db._last_exchange_log['return'], +                {'last_gen': 2, 'docs': +                 [(doc.doc_id, doc.rev), (doc2.doc_id, doc2.rev)]}) + + +class TestSoledadDbSync( +        SoledadWithCouchServerMixin, test_sync.TestDbSync): +    """Test db.sync remote sync shortcut""" + +    scenarios = [ +        ('py-token-http', { +            'create_db_and_target': make_local_db_and_token_soledad_target, +            'make_app_with_state': make_token_soledad_app, +            'make_database_for_test': make_sqlcipher_database_for_test, +            'token': True +        }), +    ] + +    oauth = False +    token = False + +    def setUp(self): +        self.main_test_class = test_sync.TestDbSync +        SoledadWithCouchServerMixin.setUp(self) + +    def do_sync(self, target_name): +        """ +        Perform sync using SoledadSynchronizer, SoledadSyncTarget +        and Token auth. +        """ +        if self.token: +            extra = dict(creds={'token': { +                'uuid': 'user-uuid', +                'token': 'auth-token', +            }}) +            target_url = self.getURL(target_name) +            return sync.SoledadSynchronizer( +                self.db, +                target.SoledadSyncTarget( +                    target_url, +                    crypto=self._soledad._crypto, +                    **extra)).sync(autocreate=True, +                                   defer_decryption=False) +        else: +            return test_sync.TestDbSync.do_sync(self, target_name) + +    def test_db_sync(self): +        """ +        Test sync. + +        Adapted to check for encrypted content. +        """ +        doc1 = self.db.create_doc_from_json(tests.simple_doc) +        doc2 = self.db2.create_doc_from_json(tests.nested_doc) +        local_gen_before_sync = self.do_sync('test2') +        gen, _, changes = self.db.whats_changed(local_gen_before_sync) +        self.assertEqual(1, len(changes)) +        self.assertEqual(doc2.doc_id, changes[0][0]) +        self.assertEqual(1, gen - local_gen_before_sync) +        self.assertGetEncryptedDoc( +            self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) +        self.assertGetEncryptedDoc( +            self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) + +    def test_db_sync_autocreate(self): +        """ +        We bypass this test because we never need to autocreate databases. +        """ +        pass + + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/test_target.py b/common/src/leap/soledad/common/tests/test_target.py index 3457a3e1..6242099d 100644 --- a/common/src/leap/soledad/common/tests/test_target.py +++ b/common/src/leap/soledad/common/tests/test_target.py @@ -437,13 +437,17 @@ class TestSoledadSyncTarget(          def bomb_put_doc_if_newer(self, doc, save_conflict,                                    replica_uid=None, replica_gen=None, -                                  replica_trans_id=None): +                                  replica_trans_id=None, number_of_docs=None, +                                  doc_idx=None, sync_id=None):              if doc.doc_id in trigger_ids:                  raise Exception              return _put_doc_if_newer(doc, save_conflict=save_conflict,                                       replica_uid=replica_uid,                                       replica_gen=replica_gen, -                                     replica_trans_id=replica_trans_id) +                                     replica_trans_id=replica_trans_id, +                                     number_of_docs=number_of_docs, +                                     doc_idx=doc_idx, +                                     sync_id=sync_id)          from leap.soledad.common.tests.test_couch import IndexedCouchDatabase          self.patch(              IndexedCouchDatabase, '_put_doc_if_newer', bomb_put_doc_if_newer) @@ -457,9 +461,8 @@ class TestSoledadSyncTarget(          doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')          doc2 = self.make_document('doc-here2', 'replica:1',                                    '{"value": "here2"}') -        self.assertRaises( -            u1db.errors.HTTPError, -            remote_target.sync_exchange, +        # We do not expect an exception here because the sync fails gracefully +        remote_target.sync_exchange(              [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')],              'replica', last_known_generation=0, last_known_trans_id=None,              return_doc_cb=receive_doc) @@ -480,11 +483,9 @@ class TestSoledadSyncTarget(          self.assertEqual(              (11, 'T-sud'), db._get_replica_gen_and_trans_id('replica'))          self.assertEqual(2, new_gen) -        # we do not expect the document to be bounced back because soledad has -        # stateful sync -        #self.assertEqual( -        #    ('doc-here', 'replica:1', '{"value": "here"}', 1), -        #    other_changes[0][:-1]) +        self.assertEqual( +            ('doc-here', 'replica:1', '{"value": "here"}', 1), +            other_changes[0][:-1])      def test_sync_exchange_send_ensure_callback(self):          """ diff --git a/common/src/leap/soledad/common/tests/test_target_soledad.py b/common/src/leap/soledad/common/tests/test_target_soledad.py new file mode 100644 index 00000000..899203b8 --- /dev/null +++ b/common/src/leap/soledad/common/tests/test_target_soledad.py @@ -0,0 +1,102 @@ +from u1db.remote import ( +    http_database, +) + +from leap.soledad.client import ( +    auth, +    VerifiedHTTPSConnection, +) +from leap.soledad.common.document import SoledadDocument +from leap.soledad.server import SoledadApp +from leap.soledad.server.auth import SoledadTokenAuthMiddleware + + +from leap.soledad.common.tests import u1db_tests as tests +from leap.soledad.common.tests import BaseSoledadTest +from leap.soledad.common.tests.u1db_tests import test_backends + + +#----------------------------------------------------------------------------- +# The following tests come from `u1db.tests.test_backends`. +#----------------------------------------------------------------------------- + +def make_leap_document_for_test(test, doc_id, rev, content, +                                has_conflicts=False): +    return SoledadDocument( +        doc_id, rev, content, has_conflicts=has_conflicts) + + +def make_soledad_app(state): +    return SoledadApp(state) + + +def make_token_soledad_app(state): +    app = SoledadApp(state) + +    def _verify_authentication_data(uuid, auth_data): +        if uuid == 'user-uuid' and auth_data == 'auth-token': +            return True +        return False + +    # we test for action authorization in leap.soledad.common.tests.test_server +    def _verify_authorization(uuid, environ): +        return True + +    application = SoledadTokenAuthMiddleware(app) +    application._verify_authentication_data = _verify_authentication_data +    application._verify_authorization = _verify_authorization +    return application + + +LEAP_SCENARIOS = [ +    ('http', { +        'make_database_for_test': test_backends.make_http_database_for_test, +        'copy_database_for_test': test_backends.copy_http_database_for_test, +        'make_document_for_test': make_leap_document_for_test, +        'make_app_with_state': make_soledad_app}), +] + + +def make_token_http_database_for_test(test, replica_uid): +    test.startServer() +    test.request_state._create_database(replica_uid) + +    class _HTTPDatabaseWithToken( +            http_database.HTTPDatabase, auth.TokenBasedAuth): + +        def set_token_credentials(self, uuid, token): +            auth.TokenBasedAuth.set_token_credentials(self, uuid, token) + +        def _sign_request(self, method, url_query, params): +            return auth.TokenBasedAuth._sign_request( +                self, method, url_query, params) + +    http_db = _HTTPDatabaseWithToken(test.getURL('test')) +    http_db.set_token_credentials('user-uuid', 'auth-token') +    return http_db + + +def copy_token_http_database_for_test(test, db): +    # DO NOT COPY OR REUSE THIS CODE OUTSIDE TESTS: COPYING U1DB DATABASES IS +    # THE WRONG THING TO DO, THE ONLY REASON WE DO SO HERE IS TO TEST THAT WE +    # CORRECTLY DETECT IT HAPPENING SO THAT WE CAN RAISE ERRORS RATHER THAN +    # CORRUPT USER DATA. USE SYNC INSTEAD, OR WE WILL SEND NINJA TO YOUR +    # HOUSE. +    http_db = test.request_state._copy_database(db) +    http_db.set_token_credentials(http_db, 'user-uuid', 'auth-token') +    return http_db + + +class SoledadTests(test_backends.AllDatabaseTests, BaseSoledadTest): + +    scenarios = LEAP_SCENARIOS + [ +        ('token_http', {'make_database_for_test': +                        make_token_http_database_for_test, +                        'copy_database_for_test': +                        copy_token_http_database_for_test, +                        'make_document_for_test': make_leap_document_for_test, +                        'make_app_with_state': make_token_soledad_app, +                        }) +    ] + +load_tests = tests.load_with_scenarios diff --git a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py index 99ff77b4..ad66fb06 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/__init__.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/__init__.py @@ -13,8 +13,9 @@  #  # You should have received a copy of the GNU Lesser General Public License  # along with u1db.  If not, see <http://www.gnu.org/licenses/>. - -"""Test infrastructure for U1DB""" +""" +Test infrastructure for U1DB +"""  import copy  import shutil diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py index c0a7e1f7..86e76fad 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_backends.py @@ -41,7 +41,7 @@ from u1db.remote import (  ) -def make_http_database_for_test(test, replica_uid, path='test'): +def make_http_database_for_test(test, replica_uid, path='test', *args):      test.startServer()      test.request_state._create_database(replica_uid)      return http_database.HTTPDatabase(test.getURL(path)) diff --git a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py index 633fd8dd..5e2bec86 100644 --- a/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py +++ b/common/src/leap/soledad/common/tests/u1db_tests/test_sync.py @@ -1151,6 +1151,9 @@ class TestDbSync(tests.TestCaseWithServer):          target_url = self.getURL(path)          return self.db.sync(target_url, **extra) +    def sync(self, callback=None, autocreate=False, defer_decryption=False): +        return super(TestDbSync, self).sync(callback, autocreate) +      def setUp(self):          super(TestDbSync, self).setUp()          self.startServer() diff --git a/debian/changelog b/debian/changelog index 212f4309..f2f5411c 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +soledad (0.6.0) unstable; urgency=low + +  * Update to 0.6.0 release. + + -- db <db@leap.se>  Wed, 01 Oct 2014 15:39:38 -0300 +  soledad (0.5.2.1) unstable; urgency=medium    * Update soledad-client version dependency on python-pycryptopp diff --git a/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py b/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py new file mode 100755 index 00000000..b2b35d30 --- /dev/null +++ b/scripts/profiling/doc_put_memory_usage/get-soledad-and-couch-mem.py @@ -0,0 +1,46 @@ +#!/usr/bin/python + + +import logging +import argparse +import psutil +import time + + +def find_procs(procs): +    result = [] +    for name, executable in procs: +        found = filter( +            lambda p: executable == p.name, +            psutil.process_iter()) +        if len(found) == 1: +            result.append(found[0]) +    return result + + +def log_memory(soledad, bigcouch): +    while True: +        print "%f %f" % \ +            (soledad.get_memory_percent(), bigcouch.get_memory_percent()) +        time.sleep(1) + + +if __name__ == '__main__': +     +    # configure logger +    logger = logging.getLogger(__name__) +    LOG_FORMAT = '%(asctime)s %(message)s' +    logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +    # parse command line +    parser = argparse.ArgumentParser() +    parser.add_argument( +        '-l', dest='logfile', +        help='log output to file') +    args = parser.parse_args() + +    log_memory(*find_procs([ +        ('Soledad', 'twistd'), +        ('Bigcouch', 'beam.smp')])) +     diff --git a/scripts/profiling/doc_put_memory_usage/profile-procs.py b/scripts/profiling/doc_put_memory_usage/profile-procs.py new file mode 100755 index 00000000..53f5977b --- /dev/null +++ b/scripts/profiling/doc_put_memory_usage/profile-procs.py @@ -0,0 +1,54 @@ +#!/usr/bin/python + + +import logging +import argparse +import psutil +import time + + +def find_procs(procs): +    result = [] +    for name, executable in procs: +        found = filter( +            lambda p: executable == p.name, +            psutil.process_iter()) +        if len(found) == 1: +            result.append(found[0]) +    return result + + +def log_usage(procs, logger): +    names = [proc.name for proc in procs] +    logger.info("Logging cpu and memory for: %s" % names) +    while True: +        s = '%f %f' %\ +            (psutil.cpu_percent(), psutil.phymem_usage().percent) +        for proc in procs: +            s += ' %f %f' % \ +                 (proc.get_cpu_percent(), proc.get_memory_percent()) +        logger.info(s) +        time.sleep(1) + + +if __name__ == '__main__': +    # parse command line +    parser = argparse.ArgumentParser() +    parser.add_argument( +        '-l', dest='logfile', +        help='log output to file') +    args = parser.parse_args() + +    # configure logger +    logger = logging.getLogger(__name__) +    LOG_FORMAT = '%(asctime)s %(message)s' +    logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + +    if args.logfile is not None: +        handler = logging.FileHandler(args.logfile, mode='a') +        handler.setFormatter(logging.Formatter(fmt=LOG_FORMAT)) +        logger.addHandler(handler) + +    log_usage(find_procs([ +        ('Soledad', 'twistd'), +        ('Bigcouch', 'beam.smp')]), logger) diff --git a/scripts/profiling/spam.sh b/scripts/profiling/spam.sh new file mode 100755 index 00000000..a4f2b8ef --- /dev/null +++ b/scripts/profiling/spam.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +if [ $# -ne 2 ]; then +  echo "Usage: ${0} <target_address> <number_of_messages>" +  exit 1 +fi + +target_address=${1} +missing=${2} +echo "Will send ${missing} messages to ${target_address}..." + +while [[ ${success} -eq 0 && ${missing} -gt 0 ]]; do +  echo "  missing: ${missing}" +  swaks -S                       \ +    -f ${target_address}         \ +    -t ${target_address}         \ +    -s chipmonk.cdev.bitmask.net \ +    -tlsc +  if [ $? -eq 0 ]; then +    missing=`expr ${missing} - 1` +  else +    echo "  error, retrying..." +  fi +done diff --git a/scripts/profiling/sync/client_side_db.py b/scripts/profiling/sync/client_side_db.py new file mode 120000 index 00000000..9e49a7f0 --- /dev/null +++ b/scripts/profiling/sync/client_side_db.py @@ -0,0 +1 @@ +../../db_access/client_side_db.py
\ No newline at end of file diff --git a/scripts/profiling/sync/plot.py b/scripts/profiling/sync/plot.py new file mode 100755 index 00000000..8e3e1c15 --- /dev/null +++ b/scripts/profiling/sync/plot.py @@ -0,0 +1,93 @@ +#!/usr/bin/python + + +import argparse +from matplotlib import pyplot as plt +from movingaverage import movingaverage +from scipy.interpolate import interp1d +from numpy import linspace + + +def smooth(l): +    return movingaverage(l, 3, data_is_list=True, avoid_fp_drift=False) + + +def plot(filename, subtitle=''): + +    # config the plot +    plt.xlabel('time (s)') +    plt.ylabel('usage (%)') +    title = 'soledad sync' +    if subtitle != '': +        title += '- %s' % subtitle +    plt.title(title) + +    x = [] +    ycpu = [] +    ymem = [] +    ypcpu = [] +    ypmem = [] + +    ys = [ +        (ycpu, 'total cpu', 'r'), +        (ymem, 'total mem', 'b'), +        (ypcpu, 'proc cpu', 'm'), +        (ypmem, 'proc mem', 'g'), +    ] + +    # read data from file +    with open(filename, 'r') as f: +        line = f.readline() +        while True: +            line = f.readline() +            if line.startswith('#'): +                continue +            if line == '' or line is None: +                break +            time, cpu, mem, pcpu, pmem = tuple(line.strip().split(' ')) +            x.append(float(time)) +            ycpu.append(float(cpu)) +            ymem.append(float(mem)) +            ypcpu.append(float(pcpu)) +            ypmem.append(float(pmem)) + +    smoothx = [n for n in smooth(x)] +    #xnew = linspace(0.01, 19, 100) + +    for y in ys: +        kwargs = { +            'linewidth': 1.0, +            'linestyle': '-', +        #    'marker': '.', +            'color': y[2], +        } +        #f = interp1d(x, y[0], kind='cubic') +        plt.plot( +            smoothx, +            [n for n in smooth(y[0])], +            #xnew, +            #f(xnew), +            label=y[1], **kwargs) + +    #plt.axes().get_xaxis().set_ticks(x) +    #plt.axes().get_xaxis().set_ticklabels(x) + +    # annotate max and min values +    plt.xlim(0, 20) +    plt.ylim(0, 100) +    plt.grid() +    plt.legend() +    plt.show() + + +if __name__ == '__main__': +    # parse command line +    parser = argparse.ArgumentParser() +    parser.add_argument( +        '-d', dest='datafile', required=False, default='/tmp/profile.log', +        help='the data file to plot') +    parser.add_argument( +        '-s', dest='subtitle', required=False, default='', +        help='a subtitle for the plot') +    args = parser.parse_args() +    plot(args.datafile, args.subtitle) diff --git a/scripts/profiling/sync/profile-sync.py b/scripts/profiling/sync/profile-sync.py new file mode 100644 index 00000000..fdd5b5a6 --- /dev/null +++ b/scripts/profiling/sync/profile-sync.py @@ -0,0 +1,62 @@ +#!/usr/bin/python + + +import argparse +import logging + + +from util import StatsLogger, ValidateUserHandle +from client_side_db import get_soledad_instance +#from plot import plot + + +# create a logger +logger = logging.getLogger(__name__) +LOG_FORMAT = '%(asctime)s %(message)s' +logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + + +# main program + +if __name__ == '__main__': + +    # parse command line +    parser = argparse.ArgumentParser() +    parser.add_argument( +        'user@provider', action=ValidateUserHandle, help='the user handle') +    parser.add_argument( +        '-b', dest='basedir', required=False, default=None, +        help='soledad base directory') +    parser.add_argument( +        '-p', dest='passphrase', required=False, default=None, +        help='the user passphrase') +    parser.add_argument( +        '-l', dest='logfile', required=False, default='/tmp/profile.log', +        help='the file to which write the log') +    args = parser.parse_args() + +    # get the password +    passphrase = args.passphrase +    if passphrase is None: +        passphrase = getpass.getpass( +            'Password for %s@%s: ' % (args.username, args.provider)) + +    # get the basedir +    basedir = args.basedir +    if basedir is None: +        basedir = tempfile.mkdtemp() +    logger.info('Using %s as base directory.' % basedir) + +    # get the soledad instance +    s = get_soledad_instance( +        args.username, args.provider, passphrase, basedir) +    for i in xrange(10): +        s.create_doc({}) + +    sl = StatsLogger( +        "soledad-sync", args.logfile, procs=["python"], interval=0.001) +    sl.start() +    s.sync() +    sl.stop() + +    #plot(args.logfile) diff --git a/scripts/profiling/sync/util.py b/scripts/profiling/sync/util.py new file mode 120000 index 00000000..7f16d684 --- /dev/null +++ b/scripts/profiling/sync/util.py @@ -0,0 +1 @@ +../util.py
\ No newline at end of file diff --git a/scripts/profiling/util.py b/scripts/profiling/util.py new file mode 100644 index 00000000..adf1de8c --- /dev/null +++ b/scripts/profiling/util.py @@ -0,0 +1,75 @@ +import re +import psutil +import time +import threading +import argparse +import pytz +import datetime + + +class ValidateUserHandle(argparse.Action): +    def __call__(self, parser, namespace, values, option_string=None): +        m = re.compile('^([^@]+)@([^@]+\.[^@]+)$') +        res = m.match(values) +        if res == None: +            parser.error('User handle should have the form user@provider.') +        setattr(namespace, 'username', res.groups()[0]) +        setattr(namespace, 'provider', res.groups()[1]) + + +class StatsLogger(threading.Thread): + +    def __init__(self, name, fname, procs=[], interval=0.01): +        threading.Thread.__init__(self) +        self._stopped = True +        self._name = name +        self._fname = fname +        self._procs = self._find_procs(procs) +        self._interval = interval + +    def _find_procs(self, procs): +        return filter(lambda p: p.name in procs, psutil.process_iter()) + +    def run(self): +        self._stopped = False +        with open(self._fname, 'w') as f: +            self._start = time.time() +            f.write(self._make_header()) +            while self._stopped is False: +                f.write('%s %s\n' % +                    (self._make_general_stats(), self._make_proc_stats())) +                time.sleep(self._interval) +            f.write(self._make_footer()) + +    def _make_general_stats(self): +        now = time.time() +        stats = [] +        stats.append("%f" % (now - self._start))   # elapsed time +        stats.append("%f" % psutil.cpu_percent())  # total cpu +        stats.append("%f" % psutil.phymem_usage().percent)  # total memory +        return ' '.join(stats) + +    def _make_proc_stats(self): +        stats = [] +        for p in self._procs: +            stats.append('%f' % p.get_cpu_percent())     # proc cpu +            stats.append('%f' % p.get_memory_percent())  # proc memory +        return ' '.join(stats) + +    def _make_header(self): +        header = [] +        header.append('# test_name: %s' % self._name) +        header.append('# start_time: %s' %  datetime.datetime.now(pytz.utc)) +        header.append( +            '# elapsed_time total_cpu total_memory proc_cpu proc_memory ') +        return '\n'.join(header) + '\n' + +    def _make_footer(self): +        footer = [] +        footer.append('# end_time: %s' % datetime.datetime.now(pytz.utc)) +        return '\n'.join(footer) + +    def stop(self): +        self._stopped = True + + diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index 7cbca401..28717664 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -3,13 +3,13 @@ couchdb  simplejson  u1db  routes -PyOpenSSL +PyOpenSSL<0.14  # TODO: maybe we just want twisted-web?  twisted>=12.0.0  # leap deps -- bump me! -leap.soledad.common>=0.3.0 +leap.soledad.common>=0.6.0  #  # Things yet to fix: diff --git a/server/src/leap/soledad/server/_version.py b/server/src/leap/soledad/server/_version.py index a3227cde..cf4e6706 100644 --- a/server/src/leap/soledad/server/_version.py +++ b/server/src/leap/soledad/server/_version.py @@ -5,8 +5,8 @@  # unpacked source archive. Distribution tarballs contain a pre-generated copy  # of this file. -version_version = '0.5.2' -version_full = 'fd2a9adfc9148e47b85add78d8f3c981e4fad885' +version_version = '0.6.0' +version_full = 'f71da83b530abe5b7e88b812797e235833985b6c'  def get_versions(default={}, verbose=False): diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c6928aaa..6dc99b5a 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -210,6 +210,8 @@ class SyncExchange(sync.SyncExchange):          :param last_known_generation: The last target replica generation the                                        source replica knows about.          :type last_known_generation: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          self._db = db          self.source_replica_uid = source_replica_uid @@ -284,7 +286,8 @@ class SyncExchange(sync.SyncExchange):              doc = self._db.get_doc(changed_doc_id, include_deleted=True)              return_doc_cb(doc, gen, trans_id) -    def insert_doc_from_source(self, doc, source_gen, trans_id): +    def insert_doc_from_source(self, doc, source_gen, trans_id, +            number_of_docs=None, doc_idx=None, sync_id=None):          """Try to insert synced document from source.          Conflicting documents are not inserted but will be sent over @@ -302,10 +305,18 @@ class SyncExchange(sync.SyncExchange):          :type source_gen: int          :param trans_id: The transaction id of that document change.          :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document. +        :type doc_idx: int +        :param sync_id: The id of the current sync session. +        :type sync_id: str          """          state, at_gen = self._db._put_doc_if_newer(              doc, save_conflict=False, replica_uid=self.source_replica_uid, -            replica_gen=source_gen, replica_trans_id=trans_id) +            replica_gen=source_gen, replica_trans_id=trans_id, +            number_of_docs=number_of_docs, doc_idx=doc_idx, sync_id=sync_id)          if state == 'inserted':              self._sync_state.put_seen_id(doc.doc_id, at_gen)          elif state == 'converged': @@ -340,6 +351,8 @@ class SyncResource(http_app.SyncResource):          :param last_known_trans_id: The last server replica transaction_id the                                      client knows about.          :type last_known_trans_id: str +        :param sync_id: The id of the current sync session. +        :type sync_id: str          :param ensure: Whether the server replica should be created if it does                         not already exist.          :type ensure: bool @@ -355,9 +368,11 @@ class SyncResource(http_app.SyncResource):          # get a sync exchange object          self.sync_exch = self.sync_exchange_class(              db, self.source_replica_uid, last_known_generation, sync_id) +        self._sync_id = sync_id      @http_app.http_method(content_as_args=True) -    def post_put(self, id, rev, content, gen, trans_id): +    def post_put(self, id, rev, content, gen, trans_id, number_of_docs, +            doc_idx):          """          Put one incoming document into the server replica. @@ -373,9 +388,16 @@ class SyncResource(http_app.SyncResource):          :param trans_id: The source replica transaction id corresponding to                           the revision of the incoming document.          :type trans_id: str +        :param number_of_docs: The total amount of documents sent on this sync +                               session. +        :type number_of_docs: int +        :param doc_idx: The index of the current document. +        :type doc_idx: int          """          doc = Document(id, rev, content) -        self.sync_exch.insert_doc_from_source(doc, gen, trans_id) +        self.sync_exch.insert_doc_from_source( +            doc, gen, trans_id, number_of_docs=number_of_docs, +            doc_idx=doc_idx, sync_id=self._sync_id)      @http_app.http_method(received=int, content_as_args=True)      def post_get(self, received): | 
