diff options
| -rw-r--r-- | client/pkg/requirements.pip | 5 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/__init__.py | 71 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 1006 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 246 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sync.py | 156 | 
5 files changed, 1312 insertions, 172 deletions
| diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index aefb8653..ae8d2dac 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -3,6 +3,9 @@ simplejson  u1db  scrypt  pycryptopp +cchardet +taskthread +zope.proxy  #  # leap deps @@ -20,5 +23,3 @@ oauth  # pysqlite should not be a dep, see #2945  pysqlite - -cchardet diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 0d3a21fd..e06335f0 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -223,33 +223,48 @@ class Soledad(object):      """      def __init__(self, uuid, passphrase, secrets_path, local_db_path, -                 server_url, cert_file, auth_token=None, secret_id=None): +                 server_url, cert_file, +                 auth_token=None, secret_id=None, defer_encryption=False):          """          Initialize configuration, cryptographic keys and dbs.          :param uuid: User's uuid.          :type uuid: str +          :param passphrase: The passphrase for locking and unlocking encryption                             secrets for local and remote storage.          :type passphrase: unicode +          :param secrets_path: Path for storing encrypted key used for                               symmetric encryption.          :type secrets_path: str +          :param local_db_path: Path for local encrypted storage db.          :type local_db_path: str +          :param server_url: URL for Soledad server. This is used either to sync -            with the user's remote db and to interact with the shared recovery -            database. +                           with the user's remote db and to interact with the +                           shared recovery database.          :type server_url: str +          :param cert_file: Path to the certificate of the ca used                            to validate the SSL certificate used by the remote                            soledad server.          :type cert_file: str +          :param auth_token: Authorization token for accessing remote databases.          :type auth_token: str +        :param secret_id: The id of the storage secret to be used. +        :type secret_id: str + +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool +          :raise BootstrapSequenceError: Raised when the secret generation and -            storage on server sequence has failed for some reason. +                                       storage on server sequence has failed +                                       for some reason.          """          # get config params          self._uuid = uuid @@ -258,8 +273,10 @@ class Soledad(object):          # init crypto variables          self._secrets = {}          self._secret_id = secret_id -        # init config (possibly with default values) +        self._defer_encryption = defer_encryption +          self._init_config(secrets_path, local_db_path, server_url) +          self._set_token(auth_token)          self._shared_db_instance = None          # configure SSL certificate @@ -469,24 +486,19 @@ class Soledad(object):              create=True,              document_factory=SoledadDocument,              crypto=self._crypto, -            raw_key=True) +            raw_key=True, +            defer_encryption=self._defer_encryption)      def close(self):          """          Close underlying U1DB database.          """ +        logger.debug("Closing soledad")          if hasattr(self, '_db') and isinstance(                  self._db,                  SQLCipherDatabase):              self._db.close() -    def __del__(self): -        """ -        Make sure local database is closed when object is destroyed. -        """ -        # Watch out! We have no guarantees  that this is properly called. -        self.close() -      #      # Management of secret for symmetric encryption.      # @@ -520,6 +532,9 @@ class Soledad(object):          Define the id of the storage secret to be used.          This method will also replace the secret in the crypto object. + +        :param secret_id: The id of the storage secret to be used. +        :type secret_id: str          """          self._secret_id = secret_id @@ -881,7 +896,7 @@ class Soledad(object):          :type json: str          :param doc_id: An optional identifier specifying the document id.          :type doc_id: -        :return: The new cocument +        :return: The new document          :rtype: SoledadDocument          """          return self._db.create_doc_from_json(json, doc_id=doc_id) @@ -1041,7 +1056,7 @@ class Soledad(object):          if self._db:              return self._db.resolve_doc(doc, conflicted_doc_revs) -    def sync(self): +    def sync(self, defer_decryption=True):          """          Synchronize the local encrypted replica with a remote replica. @@ -1051,16 +1066,25 @@ class Soledad(object):          :param url: the url of the target replica to sync with          :type url: str -        :return: the local generation before the synchronisation was -            performed. +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool + +        :return: The local generation before the synchronisation was +                 performed.          :rtype: str          """          if self._db: +            try:                  local_gen = self._db.sync(                      urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), -                    creds=self._creds, autocreate=False) +                    creds=self._creds, autocreate=False, +                    defer_decryption=defer_decryption)                  signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)                  return local_gen +            except Exception as e: +                logger.error("Soledad exception when syncing: %s" % str(e))      def stop_sync(self):          """ @@ -1079,7 +1103,9 @@ class Soledad(object):          :return: Whether remote replica and local replica differ.          :rtype: bool          """ -        target = SoledadSyncTarget(url, creds=self._creds, crypto=self._crypto) +        target = SoledadSyncTarget( +            url, self._db._get_replica_uid(), creds=self._creds, +            crypto=self._crypto)          info = target.get_sync_info(self._db._get_replica_uid())          # compare source generation with target's last known source generation          if self._db._get_generation() != info[4]: @@ -1087,6 +1113,13 @@ class Soledad(object):              return True          return False +    @property +    def syncing(self): +        """ +        Property, True if the syncer is syncing. +        """ +        return self._db.syncing +      def _set_token(self, token):          """          Set the authentication token for remote database access. diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index a6372107..7133f804 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # crypto.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,27 +14,45 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  Cryptographic utilities for Soledad.  """ - -  import os  import binascii  import hmac  import hashlib - +import json +import logging +import multiprocessing +import threading  from pycryptopp.cipher.aes import AES  from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument + + +from leap.soledad.common.crypto import ( +    EncryptionSchemes, +    UnknownEncryptionScheme, +    MacMethods, +    UnknownMacMethod, +    WrongMac, +    ENC_JSON_KEY, +    ENC_SCHEME_KEY, +    ENC_METHOD_KEY, +    ENC_IV_KEY, +    MAC_KEY, +    MAC_METHOD_KEY, +) +logger = logging.getLogger(__name__) -from leap.soledad.common import ( -    soledad_assert, -    soledad_assert_type, -) + +MAC_KEY_LENGTH = 64  class EncryptionMethods(object): @@ -45,6 +63,17 @@ class EncryptionMethods(object):      AES_256_CTR = 'aes-256-ctr'      XSALSA20 = 'xsalsa20' +# +# Exceptions +# + + +class DocumentNotEncrypted(Exception): +    """ +    Raised for failures in document encryption. +    """ +    pass +  class UnknownEncryptionMethod(Exception):      """ @@ -59,13 +88,116 @@ class NoSymmetricSecret(Exception):      """ -class SoledadCrypto(object): +def encrypt_sym(data, key, method):      """ -    General cryptographic functionality. +    Encrypt C{data} using a {password}. + +    Currently, the only encryption methods supported are AES-256 in CTR +    mode and XSalsa20. + +    :param data: The data to be encrypted. +    :type data: str +    :param key: The key used to encrypt C{data} (must be 256 bits long). +    :type key: str +    :param method: The encryption method to use. +    :type method: str + +    :return: A tuple with the initial value and the encrypted data. +    :rtype: (long, str) +    """ +    soledad_assert_type(key, str) + +    soledad_assert( +        len(key) == 32,  # 32 x 8 = 256 bits. +        'Wrong key size: %s bits (must be 256 bits long).' % +        (len(key) * 8)) +    iv = None +    # AES-256 in CTR mode +    if method == EncryptionMethods.AES_256_CTR: +        iv = os.urandom(16) +        ciphertext = AES(key=key, iv=iv).process(data) +    # XSalsa20 +    elif method == EncryptionMethods.XSALSA20: +        iv = os.urandom(24) +        ciphertext = XSalsa20(key=key, iv=iv).process(data) +    else: +        # raise if method is unknown +        raise UnknownEncryptionMethod('Unkwnown method: %s' % method) +    return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, method, **kwargs):      """ +    Decrypt data using symmetric secret. + +    Currently, the only encryption method supported is AES-256 CTR mode. -    MAC_KEY_LENGTH = 64 +    :param data: The data to be decrypted. +    :type data: str +    :param key: The key used to decrypt C{data} (must be 256 bits long). +    :type key: str +    :param method: The encryption method to use. +    :type method: str +    :param kwargs: Other parameters specific to each encryption method. +    :type kwargs: dict + +    :return: The decrypted data. +    :rtype: str +    """ +    soledad_assert_type(key, str) +    # assert params +    soledad_assert( +        len(key) == 32,  # 32 x 8 = 256 bits. +        'Wrong key size: %s (must be 256 bits long).' % len(key)) +    soledad_assert( +        'iv' in kwargs, +        '%s needs an initial value.' % method) +    # AES-256 in CTR mode +    if method == EncryptionMethods.AES_256_CTR: +        return AES( +            key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) +    elif method == EncryptionMethods.XSALSA20: +        return XSalsa20( +            key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) + +    # raise if method is unknown +    raise UnknownEncryptionMethod('Unkwnown method: %s' % method) + + +def doc_mac_key(doc_id, secret): +    """ +    Generate a key for calculating a MAC for a document whose id is +    C{doc_id}. +    The key is derived using HMAC having sha256 as underlying hash +    function. The key used for HMAC is the first MAC_KEY_LENGTH characters +    of Soledad's storage secret. The HMAC message is C{doc_id}. + +    :param doc_id: The id of the document. +    :type doc_id: str + +    :param secret: soledad secret storage +    :type secret: Soledad.storage_secret + +    :return: The key. +    :rtype: str + +    :raise NoSymmetricSecret: if no symmetric secret was supplied. +    """ +    if secret is None: +        raise NoSymmetricSecret() + +    return hmac.new( +        secret[:MAC_KEY_LENGTH], +        doc_id, +        hashlib.sha256).digest() + + +class SoledadCrypto(object): +    """ +    General cryptographic functionality encapsulated in a +    object that can be passed along. +    """      def __init__(self, soledad):          """          Initialize the crypto object. @@ -77,78 +209,14 @@ class SoledadCrypto(object):      def encrypt_sym(self, data, key,                      method=EncryptionMethods.AES_256_CTR): -        """ -        Encrypt C{data} using a {password}. - -        Currently, the only  encryption method supported is AES-256 CTR mode. - -        :param data: The data to be encrypted. -        :type data: str -        :param key: The key used to encrypt C{data} (must be 256 bits long). -        :type key: str -        :param method: The encryption method to use. -        :type method: str - -        :return: A tuple with the initial value and the encrypted data. -        :rtype: (long, str) -        """ -        soledad_assert_type(key, str) - -        soledad_assert( -            len(key) == 32,  # 32 x 8 = 256 bits. -            'Wrong key size: %s bits (must be 256 bits long).' % -            (len(key) * 8)) -        iv = None -        # AES-256 in CTR mode -        if method == EncryptionMethods.AES_256_CTR: -            iv = os.urandom(16) -            ciphertext = AES(key=key, iv=iv).process(data) -        # XSalsa20 -        elif method == EncryptionMethods.XSALSA20: -            iv = os.urandom(24) -            ciphertext = XSalsa20(key=key, iv=iv).process(data) -        else: -            # raise if method is unknown -            raise UnknownEncryptionMethod('Unkwnown method: %s' % method) -        return binascii.b2a_base64(iv), ciphertext +        return encrypt_sym(data, key, method)      def decrypt_sym(self, data, key,                      method=EncryptionMethods.AES_256_CTR, **kwargs): -        """ -        Decrypt data using symmetric secret. +        return decrypt_sym(data, key, method, **kwargs) -        Currently, the only encryption method supported is AES-256 CTR mode. - -        :param data: The data to be decrypted. -        :type data: str -        :param key: The key used to decrypt C{data} (must be 256 bits long). -        :type key: str -        :param method: The encryption method to use. -        :type method: str -        :param kwargs: Other parameters specific to each encryption method. -        :type kwargs: dict - -        :return: The decrypted data. -        :rtype: str -        """ -        soledad_assert_type(key, str) -        # assert params -        soledad_assert( -            len(key) == 32,  # 32 x 8 = 256 bits. -            'Wrong key size: %s (must be 256 bits long).' % len(key)) -        soledad_assert( -            'iv' in kwargs, -            '%s needs an initial value.' % method) -        # AES-256 in CTR mode -        if method == EncryptionMethods.AES_256_CTR: -            return AES( -                key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) -        elif method == EncryptionMethods.XSALSA20: -            return XSalsa20( -                key=key, iv=binascii.a2b_base64(kwargs['iv'])).process(data) - -        # raise if method is unknown -        raise UnknownEncryptionMethod('Unkwnown method: %s' % method) +    def doc_mac_key(self, doc_id, secret): +        return doc_mac_key(doc_id, self.secret)      def doc_passphrase(self, doc_id):          """ @@ -173,41 +241,769 @@ class SoledadCrypto(object):              raise NoSymmetricSecret()          return hmac.new(              self.secret[ -                self.MAC_KEY_LENGTH: +                MAC_KEY_LENGTH:                  self._soledad.REMOTE_STORAGE_SECRET_LENGTH],              doc_id,              hashlib.sha256).digest() -    def doc_mac_key(self, doc_id): +    # +    # secret setters/getters +    # + +    def _get_secret(self): +        return self._soledad.storage_secret + +    secret = property( +        _get_secret, doc='The secret used for symmetric encryption') + +# +# Crypto utilities for a SoledadDocument. +# + + +def mac_doc(doc_id, doc_rev, ciphertext, mac_method, secret): +    """ +    Calculate a MAC for C{doc} using C{ciphertext}. + +    Current MAC method used is HMAC, with the following parameters: + +        * key: sha256(storage_secret, doc_id) +        * msg: doc_id + doc_rev + ciphertext +        * digestmod: sha256 + +    :param doc_id: The id of the document. +    :type doc_id: str +    :param doc_rev: The revision of the document. +    :type doc_rev: str +    :param ciphertext: The content of the document. +    :type ciphertext: str +    :param mac_method: The MAC method to use. +    :type mac_method: str +    :param secret: soledad secret +    :type secret: Soledad.secret_storage + +    :return: The calculated MAC. +    :rtype: str +    """ +    if mac_method == MacMethods.HMAC: +        return hmac.new( +            doc_mac_key(doc_id, secret), +            str(doc_id) + str(doc_rev) + ciphertext, +            hashlib.sha256).digest() +    # raise if we do not know how to handle this MAC method +    raise UnknownMacMethod('Unknown MAC method: %s.' % mac_method) + + +def encrypt_doc(crypto, doc): +    """ +    Wrapper around encrypt_docstr that accepts a crypto object and the document +    as arguments. + +    :param crypto: a soledad crypto object. +    :type crypto: SoledadCrypto +    :param doc: the document. +    :type doc: SoledadDocument +    """ +    key = crypto.doc_passphrase(doc.doc_id) +    secret = crypto.secret + +    return encrypt_docstr( +        doc.get_json(), doc.doc_id, doc.rev, key, secret) + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): +    """ +    Encrypt C{doc}'s content. + +    Encrypt doc's contents using AES-256 CTR mode and return a valid JSON +    string representing the following: + +        { +            ENC_JSON_KEY: '<encrypted doc JSON string>', +            ENC_SCHEME_KEY: 'symkey', +            ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, +            ENC_IV_KEY: '<the initial value used to encrypt>', +            MAC_KEY: '<mac>' +            MAC_METHOD_KEY: 'hmac' +        } + +    :param docstr: A representation of the document to be encrypted. +    :type docstr: str or unicode. + +    :param doc_id: The document id. +    :type doc_id: str + +    :param doc_rev: The document revision. +    :type doc_rev: str + +    :param key: The key used to encrypt ``data`` (must be 256 bits long). +    :type key: str + +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: The JSON serialization of the dict representing the encrypted +             content. +    :rtype: str +    """ +    # encrypt content using AES-256 CTR mode +    iv, ciphertext = encrypt_sym( +        str(docstr),  # encryption/decryption routines expect str +        key, method=EncryptionMethods.AES_256_CTR) +    # Return a representation for the encrypted content. In the following, we +    # convert binary data to hexadecimal representation so the JSON +    # serialization does not complain about what it tries to serialize. +    hex_ciphertext = binascii.b2a_hex(ciphertext) +    return json.dumps({ +        ENC_JSON_KEY: hex_ciphertext, +        ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, +        ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, +        ENC_IV_KEY: iv, +        MAC_KEY: binascii.b2a_hex(mac_doc(  # store the mac as hex. +            doc_id, doc_rev, ciphertext, +            MacMethods.HMAC, secret)), +        MAC_METHOD_KEY: MacMethods.HMAC, +    }) + + +def decrypt_doc(crypto, doc): +    """ +    Wrapper around decrypt_doc_dict that accepts a crypto object and the +    document as arguments. + +    :param crypto: a soledad crypto object. +    :type crypto: SoledadCrypto +    :param doc: the document. +    :type doc: SoledadDocument + +    :return: json string with the decrypted document +    :rtype: str +    """ +    key = crypto.doc_passphrase(doc.doc_id) +    secret = crypto.secret +    return decrypt_doc_dict(doc.content, doc.doc_id, doc.rev, key, secret) + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): +    """ +    Decrypt C{doc}'s content. + +    Return the JSON string representation of the document's decrypted content. + +    The passed doc_dict argument should have the following structure: + +        { +            ENC_JSON_KEY: '<enc_blob>', +            ENC_SCHEME_KEY: '<enc_scheme>', +            ENC_METHOD_KEY: '<enc_method>', +            ENC_IV_KEY: '<initial value used to encrypt>',  # (optional) +            MAC_KEY: '<mac>' +            MAC_METHOD_KEY: 'hmac' +        } + +    C{enc_blob} is the encryption of the JSON serialization of the document's +    content. For now Soledad just deals with documents whose C{enc_scheme} is +    EncryptionSchemes.SYMKEY and C{enc_method} is +    EncryptionMethods.AES_256_CTR. + +    :param doc_dict: The content of the document to be decrypted. +    :type doc_dict: dict + +    :param doc_id: The document id. +    :type doc_id: str + +    :param doc_rev: The document revision. +    :type doc_rev: str + +    :param key: The key used to encrypt ``data`` (must be 256 bits long). +    :type key: str + +    :param secret: +    :type secret: + +    :return: The JSON serialization of the decrypted content. +    :rtype: str +    """ +    soledad_assert(ENC_JSON_KEY in doc_dict) +    soledad_assert(ENC_SCHEME_KEY in doc_dict) +    soledad_assert(ENC_METHOD_KEY in doc_dict) +    soledad_assert(MAC_KEY in doc_dict) +    soledad_assert(MAC_METHOD_KEY in doc_dict) + +    # verify MAC +    ciphertext = binascii.a2b_hex(  # content is stored as hex. +        doc_dict[ENC_JSON_KEY]) +    mac = mac_doc( +        doc_id, doc_rev, +        ciphertext, +        doc_dict[MAC_METHOD_KEY], secret) +    # we compare mac's hashes to avoid possible timing attacks that might +    # exploit python's builtin comparison operator behaviour, which fails +    # immediatelly when non-matching bytes are found. +    doc_mac_hash = hashlib.sha256( +        binascii.a2b_hex(  # the mac is stored as hex +            doc_dict[MAC_KEY])).digest() +    calculated_mac_hash = hashlib.sha256(mac).digest() + +    if doc_mac_hash != calculated_mac_hash: +        logger.warning("Wrong MAC while decrypting doc...") +        raise WrongMac('Could not authenticate document\'s contents.') +    # decrypt doc's content +    enc_scheme = doc_dict[ENC_SCHEME_KEY] +    plainjson = None +    if enc_scheme == EncryptionSchemes.SYMKEY: +        enc_method = doc_dict[ENC_METHOD_KEY] +        if enc_method == EncryptionMethods.AES_256_CTR: +            soledad_assert(ENC_IV_KEY in doc_dict) +            plainjson = decrypt_sym( +                ciphertext, key, +                method=enc_method, +                iv=doc_dict[ENC_IV_KEY]) +        else: +            raise UnknownEncryptionMethod(enc_method) +    else: +        raise UnknownEncryptionScheme(enc_scheme) + +    return plainjson + + +def is_symmetrically_encrypted(doc): +    """ +    Return True if the document was symmetrically encrypted. + +    :param doc: The document to check. +    :type doc: SoledadDocument + +    :rtype: bool +    """ +    if doc.content and ENC_SCHEME_KEY in doc.content: +        if doc.content[ENC_SCHEME_KEY] == EncryptionSchemes.SYMKEY: +            return True +    return False + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): +    """ +    Base class for encrypter/decrypter pools. +    """ +    WORKERS = 5 + +    def __init__(self, crypto, sync_db, write_lock): +        """ +        Initialize the pool of encryption-workers. + +        :param crypto: A SoledadCryto instance to perform the encryption. +        :type crypto: leap.soledad.crypto.SoledadCrypto + +        :param sync_db: a database connection handle +        :type sync_db: handle + +        :param write_lock: a write lock for controlling concurrent access +                           to the sync_db +        :type write_lock: threading.Lock +        """ +        self._pool = multiprocessing.Pool(self.WORKERS) +        self._crypto = crypto +        self._sync_db = sync_db +        self._sync_db_write_lock = write_lock + +    def close(self): +        """ +        Cleanly close the pool of workers. +        """ +        logger.debug("Closing %s" % (self.__class__.__name__,)) +        self._pool.close() +        try: +            self._pool.join() +        except Exception: +            pass + +    def terminate(self): +        """ +        Terminate the pool of workers. +        """ +        logger.debug("Terminating %s" % (self.__class__.__name__,)) +        self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): +    """ +    Encrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The serialized content of the document. +    :type content: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    encrypted_content = encrypt_docstr( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric encryption +    of documents to be synced. +    """ +    # TODO implement throttling to reduce cpu usage?? +    WORKERS = 5 +    TABLE_NAME = "docs_tosync" +    FIELD_NAMES = "doc_id, rev, content" + +    def encrypt_doc(self, doc, workers=True): +        """ +        Symmetrically encrypt a document. + +        :param doc: The document with contents to be encrypted. +        :type doc: SoledadDocument + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        soledad_assert(self._crypto is not None, "need a crypto object") +        docstr = doc.get_json() +        key = self._crypto.doc_passphrase(doc.doc_id) +        secret = self._crypto.secret +        args = doc.doc_id, doc.rev, docstr, key, secret + +        try: +            if workers: +                res = self._pool.apply_async( +                    encrypt_doc_task, args, +                    callback=self.encrypt_doc_cb) +            else: +                # encrypt inline +                res = encrypt_doc_task(*args) +                self.encrypt_doc_cb(res) + +        except Exception as exc: +            logger.exception(exc) + +    def encrypt_doc_cb(self, result):          """ -        Generate a key for calculating a MAC for a document whose id is -        C{doc_id}. +        Insert results of encryption routine into the local sync database. -        The key is derived using HMAC having sha256 as underlying hash -        function. The key used for HMAC is the first MAC_KEY_LENGTH characters -        of Soledad's storage secret. The HMAC message is C{doc_id}. +        :param result: A tuple containing the doc id, revision and encrypted +                       content. +        :type result: tuple(str, str, str) +        """ +        doc_id, doc_rev, content = result +        self.insert_encrypted_local_doc(doc_id, doc_rev, content) -        :param doc_id: The id of the document. +    def insert_encrypted_local_doc(self, doc_id, doc_rev, content): +        """ +        Insert the contents of the encrypted doc into the local sync +        database. + +        :param doc_id: The document id.          :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param content: The encrypted document. +        :type content: str +        """ +        sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) -        :return: The key. -        :rtype: str +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_del, (doc_id, )) +                con.execute(sql_ins, (doc_id, doc_rev, content)) -        :raise NoSymmetricSecret: if no symmetric secret was supplied. + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): +    """ +    Decrypt the content of the given document. + +    :param doc_id: The document id. +    :type doc_id: str +    :param doc_rev: The document revision. +    :type doc_rev: str +    :param content: The encrypted content of the document. +    :type content: str +    :param gen: The generation corresponding to the modification of that +                document. +    :type gen: int +    :param trans_id: The transaction id corresponding to the modification of +                     that document. +    :type trans_id: str +    :param key: The encryption key. +    :type key: str +    :param secret: The Soledad secret (used for MAC auth). +    :type secret: str + +    :return: A tuple containing the doc id, revision and encrypted content. +    :rtype: tuple(str, str, str) +    """ +    decrypted_content = decrypt_doc_dict( +        content, doc_id, doc_rev, key, secret) +    return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): +    """ +    Return a list of documents ready to be inserted. This list is computed +    by aligning the expected list with the already gotten docs, and returning +    the maximum number of docs that can be processed in the expected order +    before finding a gap. + +    :param expected: A list of generations to be inserted. +    :type expected: list + +    :param got: A dictionary whose values are the docs to be inserted. +    :type got: dict +    """ +    ordered = [got.get(i) for i in expected] +    if None in ordered: +        return ordered[:ordered.index(None)] +    else: +        return ordered + + +class SyncDecrypterPool(SyncEncryptDecryptPool): +    """ +    Pool of workers that spawn subprocesses to execute the symmetric decryption +    of documents that were received. + +    The decryption of the received documents is done in two steps: + +        1. All the encrypted docs are collected, together with their generation +           and transaction-id +        2. The docs are enqueued for decryption. When completed, they are +           inserted following the generation order. +    """ +    # TODO implement throttling to reduce cpu usage?? +    TABLE_NAME = "docs_received" +    FIELD_NAMES = "doc_id, rev, content, gen, trans_id" + +    write_encrypted_lock = threading.Lock() + +    def __init__(self, *args, **kwargs):          """ -        if self.secret is None: -            raise NoSymmetricSecret() -        return hmac.new( -            self.secret[:self.MAC_KEY_LENGTH], -            doc_id, -            hashlib.sha256).digest() +        Initialize the decrypter pool, and setup a dict for putting the +        results of the decrypted docs until they are picked by the insert +        routine that gets them in order. +        """ +        self._insert_doc_cb = kwargs.pop("insert_doc_cb") +        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) +        self.decrypted_docs = {} +        self.source_replica_uid = None -    # -    # secret setters/getters -    # +    def set_source_replica_uid(self, source_replica_uid): +        """ +        Set the source replica uid for this decrypter pool instance. -    def _get_secret(self): -        return self._soledad.storage_secret +        :param source_replica_uid: The uid of the source replica. +        :type source_replica_uid: str +        """ +        self.source_replica_uid = source_replica_uid -    secret = property( -        _get_secret, doc='The secret used for symmetric encryption') +    def insert_encrypted_received_doc(self, doc_id, doc_rev, content, +                                      gen, trans_id): +        """ +        Insert a received message with encrypted content, to be decrypted later +        on. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        """ +        docstr = json.dumps(content) +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( +            self.TABLE_NAME,) + +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + +    def insert_marker_for_received_doc(self, doc_id, doc_rev, gen): +        """ +        Insert a marker with the document id, revision and generation on the +        sync db. This document does not have an encrypted payload, so the +        content has already been inserted into the decrypted_docs dictionary +        from where it can be picked following generation order. +        We need to leave here the marker to be able to calculate the expected +        insertion order for a synchronization batch. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param gen: the Document Generation +        :type gen: int +        """ +        sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( +            self.TABLE_NAME,) +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_ins, (doc_id, doc_rev, '', gen, '')) + +    def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): +        """ +        Insert a document that is not symmetrically encrypted. +        We store it in the staging area (the decrypted_docs dictionary) to be +        picked up in order as the preceding documents are decrypted. + +        :param doc_id: The Document ID. +        :type doc_id: str +        :param doc_rev: The Document Revision +        :param doc_rev: str +        :param content: the Content of the document +        :type content: str +        :param gen: the Document Generation +        :type gen: int +        :param trans_id: Transaction ID +        :type trans_id: str +        """ +        # XXX this need a deeper review / testing. +        # I believe that what I'm doing here is prone to problems +        # if the sync is interrupted (ie, client crash) in the worst possible +        # moment. We would need a recover strategy in that case +        # (or, insert the document in the table all the same, but with a flag +        # saying if the document is sym-encrypted or not), +        content = json.dumps(content) +        result = doc_id, doc_rev, content, gen, trans_id +        self.decrypted_docs[gen] = result +        self.insert_marker_for_received_doc(doc_id, doc_rev, gen) + +    def delete_encrypted_received_doc(self, doc_id, doc_rev): +        """ +        Delete a encrypted received doc after it was inserted into the local +        db. + +        :param doc_id: Document ID. +        :type doc_id: str +        :param doc_rev: Document revision. +        :type doc_rev: str +        """ +        sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( +            self.TABLE_NAME,) +        con = self._sync_db +        with self._sync_db_write_lock: +            with con: +                con.execute(sql_del, (doc_id, doc_rev)) + +    def decrypt_doc(self, doc_id, rev, source_replica_uid, workers=True): +        """ +        Symmetrically decrypt a document. + +        :param doc_id: The ID for the document with contents to be encrypted. +        :type doc: str +        :param rev: The revision of the document. +        :type rev: str +        :param source_replica_uid: +        :type source_replica_uid: str + +        :param workers: Whether to defer the decryption to the multiprocess +                        pool of workers. Useful for debugging purposes. +        :type workers: bool +        """ +        self.source_replica_uid = source_replica_uid + +        # insert_doc_cb is a proxy object that gets updated with the right +        # insert function only when the sync_target invokes the sync_exchange +        # method. so, if we don't still have a non-empty callback, we refuse +        # to proceed. +        if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), +                              None): +            logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") +            return + +        # XXX move to get_doc function... +        c = self._sync_db.cursor() +        sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( +            self.TABLE_NAME,) +        try: +            c.execute(sql, (doc_id, rev)) +            res = c.fetchone() +        except Exception as exc: +            logger.warning("Error getting docs from syncdb: %r" % (exc,)) +            return +        if res is None: +            logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) +            return + +        soledad_assert(self._crypto is not None, "need a crypto object") +        try: +            doc_id, rev, docstr, gen, trans_id = res +        except ValueError: +            logger.warning("Wrong entry in sync db") +            return + +        if len(docstr) == 0: +            # not encrypted payload +            return + +        try: +            content = json.loads(docstr) +        except TypeError: +            logger.warning("Wrong type while decoding json: %s" % repr(docstr)) +            return + +        key = self._crypto.doc_passphrase(doc_id) +        secret = self._crypto.secret +        args = doc_id, rev, content, gen, trans_id, key, secret + +        try: +            if workers: +                # Ouch. This is sent to the workers asynchronously, so +                # we have no way of logging errors. We'd have to inspect +                # lingering results by querying successful / get() over them... +                # Or move the heck out of it to twisted. +                res = self._pool.apply_async( +                    decrypt_doc_task, args, +                    callback=self.decrypt_doc_cb) +            else: +                # decrypt inline +                res = decrypt_doc_task(*args) +                self.decrypt_doc_cb(res) + +        except Exception as exc: +            logger.exception(exc) + +    def decrypt_doc_cb(self, result): +        """ +        Temporarily store the decryption result in a dictionary where it will +        be picked by process_decrypted. + +        :param result: A tuple containing the doc id, revision and encrypted +        content. +        :type result: tuple(str, str, str) +        """ +        doc_id, rev, content, gen, trans_id = result +        logger.debug("Sync decrypter pool: decrypted doc %s: %s %s" % (doc_id, rev, gen)) +        self.decrypted_docs[gen] = result + +    def get_docs_by_generation(self): +        """ +        Get all documents in the received table from the sync db, +        ordered by generation. + +        :return: list of doc_id, rev, generation +        """ +        c = self._sync_db.cursor() +        sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( +            self.TABLE_NAME,) +        c.execute(sql) +        return c.fetchall() + +    def count_received_encrypted_docs(self): +        """ +        Count how many documents we have in the table for received and +        encrypted docs. + +        :return: The count of documents. +        :rtype: int +        """ +        if self._sync_db is None: +            logger.warning("cannot return count with null sync_db") +            return +        c = self._sync_db.cursor() +        sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) +        c.execute(sql) +        res = c.fetchone() +        if res is not None: +            return res[0] +        else: +            return 0 + +    def decrypt_received_docs(self): +        """ +        Get all the encrypted documents from the sync database and dispatch a +        decrypt worker to decrypt each one of them. +        """ +        docs_by_generation = self.get_docs_by_generation() +        logger.debug("Sync decrypter pool: There are %d documents to " \ +                     "decrypt." % len(docs_by_generation)) +        for doc_id, rev, gen in filter(None, docs_by_generation): +            self.decrypt_doc(doc_id, rev, self.source_replica_uid) + +    def process_decrypted(self): +        """ +        Process the already decrypted documents, and insert as many documents +        as can be taken from the expected order without finding a gap. + +        :return: Whether we have processed all the pending docs. +        :rtype: bool +        """ +        # Acquire the lock to avoid processing while we're still +        # getting data from the syncing stream, to avoid InvalidGeneration +        # problems. +        with self.write_encrypted_lock: +            already_decrypted = self.decrypted_docs +            docs = self.get_docs_by_generation() +            docs = filter(lambda entry: len(entry) > 0, docs) +            expected = [gen for doc_id, rev, gen in docs] +            docs_to_insert = get_insertable_docs_by_gen( +                expected, already_decrypted) +            for doc_fields in docs_to_insert: +                self.insert_decrypted_local_doc(*doc_fields) +        remaining = self.count_received_encrypted_docs() +        return remaining == 0 + +    def insert_decrypted_local_doc(self, doc_id, doc_rev, content, +                                   gen, trans_id): +        """ +        Insert the decrypted document into the local sqlcipher database. +        Makes use of the passed callback `return_doc_cb` passed to the caller +        by u1db sync. + +        :param doc_id: The document id. +        :type doc_id: str +        :param doc_rev: The document revision. +        :type doc_rev: str +        :param content: The serialized content of the document. +        :type content: str +        :param gen: The generation corresponding to the modification of that +                    document. +        :type gen: int +        :param trans_id: The transaction id corresponding to the modification +                         of that document. +        :type trans_id: str +        """ +        # could pass source_replica in params for callback chain +        insert_fun = self._insert_doc_cb[self.source_replica_uid] +        logger.debug("Sync decrypter pool: inserting doc in local db: " \ +                     "%s:%s %s" % (doc_id, doc_rev, gen)) +        try: +            # convert deleted documents to avoid error on document creation +            if content == 'null': +                content = None +            doc = SoledadDocument(doc_id, doc_rev, content) +            insert_fun(doc, int(gen), trans_id) +        except Exception as exc: +            logger.error("Sync decrypter pool: error while inserting " +                         "decrypted doc into local db.") +            logger.exception(exc) + +        else: +            # If no errors found, remove it from the local temporary dict +            # and from the received database. +            self.decrypted_docs.pop(gen) +            self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5ffa9c7e..46ceca42 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # sqlcipher.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,8 +14,6 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  A U1DB backend that uses SQLCipher as its persistence layer. @@ -40,11 +38,13 @@ cipher_default_use_hmac can be used to globally alter the default use of HMAC  when opening a database.  So, as the statements above were introduced for backwards compatibility with -SLCipher 1.1 databases, we do not implement them as all SQLCipher databases +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases  handled by Soledad should be created by SQLCipher >= 2.0.  """  import logging +import multiprocessing  import os +import sqlite3  import string  import threading  import time @@ -57,9 +57,12 @@ from collections import defaultdict  from pysqlcipher import dbapi2  from u1db.backends import sqlite_backend  from u1db import errors as u1db_errors +from taskthread import TimerTask -from leap.soledad.client.sync import Synchronizer +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool  from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError +from leap.soledad.client.sync import SoledadSynchronizer  from leap.soledad.common.document import SoledadDocument @@ -88,7 +91,7 @@ SQLITE_ISOLATION_LEVEL = None  def open(path, password, create=True, document_factory=None, crypto=None,           raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, -         cipher_page_size=1024): +         cipher_page_size=1024, defer_encryption=False):      """Open a database at the given location.      Will raise u1db.errors.DatabaseDoesNotExist if create=False and the @@ -114,6 +117,9 @@ def open(path, password, create=True, document_factory=None, crypto=None,      :type kdf_iter: int      :param cipher_page_size: The page size.      :type cipher_page_size: int +    :param defer_encryption: Whether to defer encryption/decryption of +                             documents, or do it inline while syncing. +    :type defer_encryption: bool      :return: An instance of Database.      :rtype SQLCipherDatabase @@ -121,7 +127,7 @@ def open(path, password, create=True, document_factory=None, crypto=None,      return SQLCipherDatabase.open_database(          path, password, create=create, document_factory=document_factory,          crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, -        cipher_page_size=cipher_page_size) +        cipher_page_size=cipher_page_size, defer_encryption=defer_encryption)  # @@ -147,19 +153,40 @@ class NotAnHexString(Exception):  #  class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): -    """A U1DB implementation that uses SQLCipher as its persistence layer.""" +    """ +    A U1DB implementation that uses SQLCipher as its persistence layer. +    """ +    defer_encryption = False      _index_storage_value = 'expand referenced encrypted'      k_lock = threading.Lock()      create_doc_lock = threading.Lock()      update_indexes_lock = threading.Lock() +    _sync_watcher = None +    _sync_enc_pool = None + +    """ +    The name of the local symmetrically encrypted documents to +    sync database file. +    """ +    LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' -    syncing_lock = defaultdict(threading.Lock)      """      A dictionary that hold locks which avoid multiple sync attempts from the      same database replica.      """ +    encrypting_lock = threading.Lock() + +    """ +    Period or recurrence of the periodic encrypting task, in seconds. +    """ +    ENCRYPT_TASK_PERIOD = 1 +    syncing_lock = defaultdict(threading.Lock) +    """ +    A dictionary that hold locks which avoid multiple sync attempts from the +    same database replica. +    """      def __init__(self, sqlcipher_file, password, document_factory=None,                   crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -194,7 +221,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              self.assert_db_is_encrypted(                  sqlcipher_file, password, raw_key, cipher, kdf_iter,                  cipher_page_size) -        # connect to the database + +        # connect to the sqlcipher database          with self.k_lock:              self._db_handle = dbapi2.connect(                  sqlcipher_file, @@ -215,6 +243,26 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              self._ensure_schema()              self._crypto = crypto +        self._sync_db = None +        self._sync_db_write_lock = None +        self._sync_enc_pool = None + +        if self.defer_encryption: +            if sqlcipher_file != ":memory:": +                self._sync_db_path = "%s-sync" % sqlcipher_file +            else: +                self._sync_db_path = ":memory:" + +            # initialize sync db +            self._init_sync_db() + +            # initialize syncing queue encryption pool +            self._sync_enc_pool = SyncEncrypterPool( +                self._crypto, self._sync_db, self._sync_db_write_lock) +            self._sync_watcher = TimerTask(self._encrypt_syncing_docs, +                                           self.ENCRYPT_TASK_PERIOD) +            self._sync_watcher.start() +          def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,                      syncable=True):              return SoledadDocument(doc_id=doc_id, rev=rev, json=json, @@ -226,7 +274,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      @classmethod      def _open_database(cls, sqlcipher_file, password, document_factory=None,                         crypto=None, raw_key=False, cipher='aes-256-cbc', -                       kdf_iter=4000, cipher_page_size=1024): +                       kdf_iter=4000, cipher_page_size=1024, +                       defer_encryption=False):          """          Open a SQLCipher database. @@ -249,10 +298,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :type kdf_iter: int          :param cipher_page_size: The page size.          :type cipher_page_size: int +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool          :return: The database object.          :rtype: SQLCipherDatabase          """ +        cls.defer_encryption = defer_encryption          if not os.path.isfile(sqlcipher_file):              raise u1db_errors.DatabaseDoesNotExist() @@ -298,43 +351,59 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      def open_database(cls, sqlcipher_file, password, create, backend_cls=None,                        document_factory=None, crypto=None, raw_key=False,                        cipher='aes-256-cbc', kdf_iter=4000, -                      cipher_page_size=1024): +                      cipher_page_size=1024, defer_encryption=False):          """          Open a SQLCipher database.          :param sqlcipher_file: The path for the SQLCipher file.          :type sqlcipher_file: str +          :param password: The password that protects the SQLCipher db.          :type password: str +          :param create: Should the datbase be created if it does not already -            exist? -        :type: bool +                       exist? +        :type create: bool +          :param backend_cls: A class to use as backend.          :type backend_cls: type +          :param document_factory: A function that will be called with the same -            parameters as Document.__init__. +                                 parameters as Document.__init__.          :type document_factory: callable +          :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt -            document contents when syncing. +                       document contents when syncing.          :type crypto: soledad.crypto.SoledadCrypto +          :param raw_key: Whether C{password} is a raw 64-char hex string or a -            passphrase that should be hashed to obtain the encyrption key. +                        passphrase that should be hashed to obtain the +                        encyrption key.          :type raw_key: bool +          :param cipher: The cipher and mode to use.          :type cipher: str +          :param kdf_iter: The number of iterations to use.          :type kdf_iter: int +          :param cipher_page_size: The page size.          :type cipher_page_size: int +        :param defer_encryption: Whether to defer encryption/decryption of +                                 documents, or do it inline while syncing. +        :type defer_encryption: bool +          :return: The database object.          :rtype: SQLCipherDatabase          """ +        cls.defer_encryption = defer_encryption          try:              return cls._open_database(                  sqlcipher_file, password, document_factory=document_factory,                  crypto=crypto, raw_key=raw_key, cipher=cipher, -                kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) +                kdf_iter=kdf_iter, cipher_page_size=cipher_page_size, +                defer_encryption=defer_encryption)          except u1db_errors.DatabaseDoesNotExist:              if not create:                  raise @@ -347,7 +416,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                  crypto=crypto, raw_key=raw_key, cipher=cipher,                  kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) -    def sync(self, url, creds=None, autocreate=True): +    def sync(self, url, creds=None, autocreate=True, defer_decryption=True):          """          Synchronize documents with remote replica exposed at url. @@ -362,6 +431,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          :type creds: dict          :param autocreate: Ask the target to create the db if non-existent.          :type autocreate: bool +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool          :return: The local generation before the synchronisation was performed.          :rtype: int @@ -370,7 +443,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          # the following context manager blocks until the syncing lock can be          # acquired.          with self.syncer(url, creds=creds) as syncer: -            res = syncer.sync(autocreate=autocreate) + +            # XXX could mark the critical section here... +            try: +                res = syncer.sync(autocreate=autocreate, +                                  defer_decryption=defer_decryption) + +            except PendingReceivedDocsSyncError: +                logger.warning("Local sync db is not clear, skipping sync...") +                return +          return res      def stop_sync(self): @@ -394,7 +476,18 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]:              syncer = self._get_syncer(url, creds=creds)              yield syncer -            syncer.sync_target.close() +            #syncer.sync_target.close() + +    @property +    def syncing(self): +        syncing = False +        for url in self._syncers: +            _, _, lock = self._syncers[url] +            is_not_locked = lock.acquire(blocking=False) +            if is_not_locked is False: +                return True +            lock.release() +        return False      def _get_syncer(self, url, creds=None):          """ @@ -415,11 +508,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          h = sha256(json.dumps([url, creds])).hexdigest()          cur_h, syncer = self._syncers.get(url, (None, None))          if syncer is None or h != cur_h: -            syncer = Synchronizer( +            wlock = self._sync_db_write_lock +            syncer = SoledadSynchronizer(                  self,                  SoledadSyncTarget(url, +                                  self._replica_uid,                                    creds=creds, -                                  crypto=self._crypto)) +                                  crypto=self._crypto, +                                  sync_db=self._sync_db, +                                  sync_db_write_lock=wlock))              self._syncers[url] = (h, syncer)          # in order to reuse the same synchronizer multiple times we have to          # reset its state (i.e. the number of documents received from target @@ -442,21 +539,85 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              'ALTER TABLE document '              'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') -    def create_doc(self, content, doc_id=None): +    def _init_sync_db(self): +        """ +        Initialize the Symmetrically-Encrypted document to be synced database, +        and the queue to communicate with subprocess workers.          """ -        Create a new document in the local encrypted database. +        self._sync_db = sqlite3.connect(self._sync_db_path, +                                        check_same_thread=False) -        :param content: the contents of the new document -        :type content: dict -        :param doc_id: an optional identifier specifying the document id -        :type doc_id: str +        self._sync_db_write_lock = threading.Lock() +        self._create_sync_db_tables() +        self.sync_queue = multiprocessing.Queue() + +    def _create_sync_db_tables(self): +        """ +        Create tables for the local sync documents db if needed. +        """ +        encr = SyncEncrypterPool +        decr = SyncDecrypterPool +        sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( +            encr.TABLE_NAME, encr.FIELD_NAMES)) +        sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( +            decr.TABLE_NAME, decr.FIELD_NAMES)) + +        with self._sync_db_write_lock: +            with self._sync_db: +                self._sync_db.execute(sql_encr) +                self._sync_db.execute(sql_decr) + +    # +    # Symmetric encryption of syncing docs +    # + +    def _encrypt_syncing_docs(self): +        """ +        Process the syncing queue and send the documents there +        to be encrypted in the sync db. They will be read by the +        SoledadSyncTarget during the sync_exchange. + +        Called periodical from the TimerTask self._sync_watcher. +        """ +        lock = self.encrypting_lock +        # optional wait flag used to avoid blocking +        if not lock.acquire(False): +            return +        else: +            queue = self.sync_queue +            try: +                while not queue.empty(): +                    doc = queue.get_nowait() +                    self._sync_enc_pool.encrypt_doc(doc) + +            except Exception as exc: +                logger.error("Error while  encrypting docs to sync") +                logger.exception(exc) +            finally: +                lock.release() + +    # +    # Document operations +    # -        :return: the new document -        :rtype: SoledadDocument +    def put_doc(self, doc):          """ -        with self.create_doc_lock: -            return sqlite_backend.SQLitePartialExpandDatabase.create_doc( -                self, content, doc_id=doc_id) +        Overwrite the put_doc method, to enqueue the modified document for +        encryption before sync. + +        :param doc: The document to be put. +        :type doc: u1db.Document + +        :return: The new document revision. +        :rtype: str +        """ +        doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc( +            self, doc) +        if self.defer_encryption: +            self.sync_queue.put_nowait(doc) +        return doc_rev + +    # indexes      def _put_and_update_indexes(self, old_doc, doc):          """ @@ -906,12 +1067,25 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          res = c.fetchall()          return res[0][0] -    def __del__(self): +    def close(self):          """ -        Closes db_handle upon object destruction. +        Close db_handle and close syncer.          """ +        logger.debug("Sqlcipher backend: closing") +        if self._sync_watcher is not None: +            self._sync_watcher.stop() +            self._sync_watcher.shutdown() +        for url in self._syncers: +            _, syncer = self._syncers[url] +            syncer.close() +        if self._sync_enc_pool is not None: +            self._sync_enc_pool.close()          if self._db_handle is not None:              self._db_handle.close() +    @property +    def replica_uid(self): +        return self._get_replica_uid() +  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 56e63416..640e22ff 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,35 +17,86 @@  """ -Sync infrastructure that can be interrupted and recovered. +Soledad synchronization utilities. + + +Extend u1db Synchronizer with the ability to: + +    * Defer the update of the known replica uid until all the decryption of +      the incoming messages has been processed. + +    * Be interrupted and recovered.  """ +  import json +import logging +from threading import Lock  from u1db import errors -from u1db.sync import Synchronizer as U1DBSynchronizer +from u1db.sync import Synchronizer -class Synchronizer(U1DBSynchronizer): +logger = logging.getLogger(__name__) + + +class SoledadSynchronizer(Synchronizer):      """      Collect the state around synchronizing 2 U1DB replicas. -    Modified to allow for interrupting the synchronization process. +    Synchronization is bi-directional, in that new items in the source are sent +    to the target, and new items in the target are returned to the source. +    However, it still recognizes that one side is initiating the request. Also, +    at the moment, conflicts are only created in the source. + +    Also modified to allow for interrupting the synchronization process.      """ +    syncing_lock = Lock() +      def stop(self):          """          Stop the current sync in progress.          """          self.sync_target.stop() -    def sync(self, autocreate=False): +    def sync(self, autocreate=False, defer_decryption=True):          """          Synchronize documents between source and target. +        Differently from u1db `Synchronizer.sync` method, this one allows to +        pass a `defer_decryption` flag that will postpone the last +        step in the synchronization dance, namely, the setting of the last +        known generation and transaction id for a given remote replica. + +        This is done to allow the ongoing parallel decryption of the incoming +        docs to proceed without `InvalidGeneration` conflicts. +          :param autocreate: Whether the target replica should be created or not.          :type autocreate: bool +        :param defer_decryption: Whether to defer the decryption process using +                                 the intermediate database. If False, +                                 decryption will be done inline. +        :type defer_decryption: bool +        """ +        self.syncing_lock.acquire() +        try: +            return self._sync(autocreate=autocreate, +                              defer_decryption=defer_decryption) +        except Exception: +            # We release the lock if there was an error. +            # Otherwise, the lock should be released from the function +            # `complete_sync`. +            self.release_syncing_lock() +            # re-raising the exceptions to let syqlcipher.sync catch them +            # (and re-create the syncer instance if needed) +            raise + +    def _sync(self, autocreate=False, defer_decryption=True): +        """ +        Helper function, called from the main `sync` method. +        See `sync` docstring.          """          sync_target = self.sync_target @@ -64,6 +115,16 @@ class Synchronizer(U1DBSynchronizer):              target_gen, target_trans_id = (0, '')              target_my_gen, target_my_trans_id = (0, '') +        logger.debug( +            "Soledad target sync info:\n" +            "  target replica uid: %s\n" +            "  target generation: %d\n" +            "  target trans id: %s\n" +            "  target my gen: %d\n" +            "  target my trans_id: %s" +            % (self.target_replica_uid, target_gen, target_trans_id, +            target_my_gen, target_my_trans_id)) +          # make sure we'll have access to target replica uid once it exists          if self.target_replica_uid is None: @@ -80,6 +141,8 @@ class Synchronizer(U1DBSynchronizer):          # what's changed since that generation and this current gen          my_gen, _, changes = self.source.whats_changed(target_my_gen) +        logger.debug("Soledad sync: there are %d documents to send." \ +                     % len(changes))          # get source last-seen database generation for the target          if self.target_replica_uid is None: @@ -88,11 +151,17 @@ class Synchronizer(U1DBSynchronizer):              target_last_known_gen, target_last_known_trans_id = \                  self.source._get_replica_gen_and_trans_id(                      self.target_replica_uid) +        logger.debug( +            "Soledad source sync info:\n" +            "  source target gen: %d\n" +            "  source target trans_id: %s" +            % (target_last_known_gen, target_last_known_trans_id))          # validate transaction ids          if not changes and target_last_known_gen == target_gen:              if target_trans_id != target_last_known_trans_id:                  raise errors.InvalidTransactionId +            self.release_syncing_lock()              return my_gen          # prepare to send all the changed docs @@ -114,12 +183,79 @@ class Synchronizer(U1DBSynchronizer):          new_gen, new_trans_id = sync_target.sync_exchange(              docs_by_generation, self.source._replica_uid,              target_last_known_gen, target_last_known_trans_id, -            self._insert_doc_from_target, ensure_callback=ensure_callback) +            self._insert_doc_from_target, ensure_callback=ensure_callback, +            defer_decryption=defer_decryption) + +        logger.debug( +            "Soledad source sync info after sync exchange:\n" +            "  source target gen: %d\n" +            "  source target trans_id: %s" +            % (new_gen, new_trans_id)) + +        info = { +            "target_replica_uid": self.target_replica_uid, +            "new_gen": new_gen, +            "new_trans_id": new_trans_id, +            "my_gen": my_gen +        } +        self._syncing_info = info + +        if defer_decryption and not sync_target.has_syncdb(): +            logger.debug("Sync target has no valid sync db, " +                         "aborting defer_decryption") +            defer_decryption = False -        # record target synced-up-to generation including applying what we sent +        self.complete_sync() +        return my_gen + +    def complete_sync(self): +        """ +        Last stage of the synchronization: +            (a) record last known generation and transaction uid for the remote +            replica, and +            (b) make target aware of our current reached generation. +        """ +        logger.debug("Completing deferred last step in SYNC...") + +        # record target synced-up-to generation including applying what we +        # sent +        info = self._syncing_info          self.source._set_replica_gen_and_trans_id( -            self.target_replica_uid, new_gen, new_trans_id) +            info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) +          # if gapless record current reached generation with target -        self._record_sync_info_with_the_target(my_gen) +        self._record_sync_info_with_the_target(info["my_gen"]) +        self.syncing_lock.release() -        return my_gen +    @property +    def syncing(self): +        """ +        Return True if a sync is ongoing, False otherwise. +        :rtype: bool +        """ +        # XXX FIXME  we need some mechanism for timeout: should cleanup and +        # release if something in the syncdb-decrypt goes wrong. we could keep +        # track of the release date and cleanup unrealistic sync entries after +        # some time. +        locked = self.syncing_lock.locked() +        return locked + +    def release_syncing_lock(self): +        """ +        Release syncing lock if it's locked. +        """ +        if self.syncing_lock.locked(): +            self.syncing_lock.release() + +    def close(self): +        """ +        Close sync target pool of workers. +        """ +        self.release_syncing_lock() +        self.sync_target.close() + +    def __del__(self): +        """ +        Cleanup: release lock. +        """ +        self.release_syncing_lock() | 
