diff options
| author | drebs <drebs@leap.se> | 2016-12-22 17:36:15 -0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2016-12-22 17:36:15 -0200 | 
| commit | 8a463796bbaba3979234b0699d140947581421e7 (patch) | |
| tree | d1e2ea96ed91ac66c7e52a30d16246a498ae9ed6 | |
| parent | f072f18f317ea31e66c7890d672b5d2fd9f3ef14 (diff) | |
| parent | e360a3a75999503cf45bfbbad69970a452cf3d32 (diff) | |
Merge tag '0.9.2'
Tag version 0.9.2
# gpg: Signature made Thu 22 Dec 2016 05:33:30 PM BRST
# gpg:                using RSA key 0x6071E70DCACC60B2
# gpg: Good signature from "drebs (work key) <db@leap.se>" [ultimate]
# gpg:                 aka "drebs (work key) <drebs@leap.se>" [ultimate]
# Impressão da chave primária: 9F73 295B 6306 E06F 3151  99AE 6071 E70D CACC 60B2
68 files changed, 1894 insertions, 2555 deletions
| diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index dd4e4605..ac2ae1f0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,7 +1,6 @@  stages:    - code-check    - tests -  - benchmark  # Cache tox envs between builds  cache: @@ -22,15 +21,5 @@ tests:    script:      - cd testing      - tox -- --couch-url http://couchdb:5984 - -benchmark: -  stage: benchmark -  image: leapcode/soledad:latest -  services: -    - couchdb -  script: -    - cd testing -    - tox -e perf -- --couch-url http://couchdb:5984    tags: -    - docker -    - benchmark +    - couchdb diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 12cb56ab..f47749d1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,26 @@ +0.9.2 - 22 December, 2016 ++++++++++++++++++++++++++ + +Performance improvements +~~~~~~~~~~~~~~~~~~~~~~~~ + +- use AES 256 GCM mode instead of CTR+HMAC. +- streaming encryption/decryption and data transfer. + +Server +~~~~~~ + +- move server to a twisted resource entrypoint. + +Client +~~~~~~ + +- use twisted http agent in the client. +- maintain backwards compatibility with old crypto scheme (AES 256 CTR+HMAC). +  No migration for now, only in 0.10. +- remove the encryption/decryption pools, replace for inline streaming crypto. +- use sqlcipher transactions on sync. +  0.9.1 - 27 November, 2016  +++++++++++++++++++++++++ diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 2ae844e1..24b168b4 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -2,3 +2,4 @@ pysqlcipher>2.6.3  scrypt  zope.proxy  twisted +cryptography diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 245a8971..3a114021 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -21,6 +21,7 @@ from leap.soledad.client.api import Soledad  from leap.soledad.common import soledad_assert  from ._version import get_versions +  __version__ = get_versions()['version']  del get_versions diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py new file mode 100644 index 00000000..4bbdd044 --- /dev/null +++ b/client/src/leap/soledad/client/_crypto.py @@ -0,0 +1,386 @@ +# -*- coding: utf-8 -*- +# _crypto.py +# Copyright (C) 2016 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Cryptographic operations for the soledad client +""" + +import binascii +import base64 +import hashlib +import hmac +import os +import re +import struct +import time + +from io import BytesIO +from itertools import imap +from collections import namedtuple + +from twisted.internet import defer +from twisted.internet import interfaces +from twisted.web.client import FileBodyProducer + +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends.multibackend import MultiBackend +from cryptography.hazmat.backends.openssl.backend \ +    import Backend as OpenSSLBackend + +from zope.interface import implements + + +SECRET_LENGTH = 64 + +CRYPTO_BACKEND = MultiBackend([OpenSSLBackend()]) + +PACMAN = struct.Struct('2sbbQ16s255p255p') +BLOB_SIGNATURE_MAGIC = '\x13\x37' + + +ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1) +ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2) +DocInfo = namedtuple('DocInfo', 'doc_id rev') + + +class EncryptionDecryptionError(Exception): +    pass + + +class InvalidBlob(Exception): +    pass + + +class SoledadCrypto(object): +    """ +    This class provides convenient methods for document encryption and +    decryption using BlobEncryptor and BlobDecryptor classes. +    """ +    def __init__(self, secret): +        """ +        Initialize the crypto object. + +        :param secret: The Soledad remote storage secret. +        :type secret: str +        """ +        self.secret = secret + +    def encrypt_doc(self, doc): +        """ +        Creates and configures a BlobEncryptor, asking it to start encryption +        and wrapping the result as a simple JSON string with a "raw" key. + +        :param doc: the document to be encrypted. +        :type doc: SoledadDocument +        :return: A deferred whose callback will be invoked with a JSON string +            containing the ciphertext as the value of "raw" key. +        :rtype: twisted.internet.defer.Deferred +        """ + +        def put_raw(blob): +            raw = blob.getvalue() +            return '{"raw": "' + raw + '"}' + +        content = BytesIO(str(doc.get_json())) +        info = DocInfo(doc.doc_id, doc.rev) +        del doc +        encryptor = BlobEncryptor(info, content, secret=self.secret) +        d = encryptor.encrypt() +        d.addCallback(put_raw) +        return d + +    def decrypt_doc(self, doc): +        """ +        Creates and configures a BlobDecryptor, asking it decrypt and returning +        the decrypted cleartext content from the encrypted document. + +        :param doc: the document to be decrypted. +        :type doc: SoledadDocument +        :return: The decrypted cleartext content of the document. +        :rtype: str +        """ +        info = DocInfo(doc.doc_id, doc.rev) +        ciphertext = BytesIO() +        payload = doc.content['raw'] +        del doc +        ciphertext.write(str(payload)) +        decryptor = BlobDecryptor(info, ciphertext, secret=self.secret) +        return decryptor.decrypt() + + +def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm): +    """ +    Encrypt data using AES-256 cipher in selected mode. + +    :param data: The data to be encrypted. +    :type data: str +    :param key: The key used to encrypt data (must be 256 bits long). +    :type key: str + +    :return: A tuple with the initialization vector and the ciphertext, both +        encoded as base64. +    :rtype: (str, str) +    """ +    mode = _mode_by_method(method) +    encryptor = AESWriter(key, mode=mode) +    encryptor.write(data) +    _, ciphertext = encryptor.end() +    iv = base64.b64encode(encryptor.iv) +    tag = encryptor.tag or '' +    return iv, ciphertext + tag + + +def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): +    """ +    Decrypt data using AES-256 cipher in selected mode. + +    :param data: The data to be decrypted. +    :type data: str +    :param key: The symmetric key used to decrypt data (must be 256 bits +                long). +    :type key: str +    :param iv: The base64 encoded initialization vector. +    :type iv: str + +    :return: The decrypted data. +    :rtype: str +    """ +    _iv = base64.b64decode(str(iv)) +    mode = _mode_by_method(method) +    tag = None +    if mode == modes.GCM: +        data, tag = data[:-16], data[-16:] +    decryptor = AESWriter(key, _iv, tag=tag, mode=mode) +    decryptor.write(data) +    _, plaintext = decryptor.end() +    return plaintext + + +class BlobEncryptor(object): +    """ +    Produces encrypted data from the cleartext data associated with a given +    SoledadDocument using AES-256 cipher in GCM mode. +    The production happens using a Twisted's FileBodyProducer, which uses a +    Cooperator to schedule calls and can be paused/resumed. Each call takes at +    most 65536 bytes from the input. +    Both the production input and output are file descriptors, so they can be +    applied to a stream of data. +    """ +    def __init__(self, doc_info, content_fd, secret=None): +        if not secret: +            raise EncryptionDecryptionError('no secret given') + +        self.doc_id = doc_info.doc_id +        self.rev = doc_info.rev +        self._content_fd = content_fd +        self._producer = FileBodyProducer(content_fd, readSize=2**16) + +        sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) +        self._aes = AESWriter(sym_key) +        self._aes.authenticate(self._make_preamble()) + +    @property +    def iv(self): +        return self._aes.iv + +    @property +    def tag(self): +        return self._aes.tag + +    def encrypt(self): +        """ +        Starts producing encrypted data from the cleartext data. + +        :return: A deferred which will be fired when encryption ends and whose +            callback will be invoked with the resulting ciphertext. +        :rtype: twisted.internet.defer.Deferred +        """ +        d = self._producer.startProducing(self._aes) +        d.addCallback(lambda _: self._end_crypto_stream()) +        return d + +    def _make_preamble(self): +        current_time = int(time.time()) + +        return PACMAN.pack( +            BLOB_SIGNATURE_MAGIC, +            ENC_SCHEME.symkey, +            ENC_METHOD.aes_256_gcm, +            current_time, +            self.iv, +            str(self.doc_id), +            str(self.rev)) + +    def _end_crypto_stream(self): +        preamble, encrypted = self._aes.end() +        result = BytesIO() +        result.write( +            base64.urlsafe_b64encode(preamble)) +        result.write(' ') +        result.write( +            base64.urlsafe_b64encode(encrypted + self.tag)) +        return defer.succeed(result) + + +class BlobDecryptor(object): +    """ +    Decrypts an encrypted blob associated with a given Document. + +    Will raise an exception if the blob doesn't have the expected structure, or +    if the GCM tag doesn't verify. +    """ + +    def __init__(self, doc_info, ciphertext_fd, result=None, +                 secret=None): +        if not secret: +            raise EncryptionDecryptionError('no secret given') + +        self.doc_id = doc_info.doc_id +        self.rev = doc_info.rev + +        ciphertext_fd, preamble, iv = self._consume_preamble(ciphertext_fd) + +        self.result = result or BytesIO() +        sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) +        self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag) +        self._aes.authenticate(preamble) + +        self._producer = FileBodyProducer(ciphertext_fd, readSize=2**16) + +    def _consume_preamble(self, ciphertext_fd): +        ciphertext_fd.seek(0) +        try: +            preamble, ciphertext = _split(ciphertext_fd.getvalue()) +            self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] +        except (TypeError, binascii.Error): +            raise InvalidBlob +        ciphertext_fd.close() + +        if len(preamble) != PACMAN.size: +            raise InvalidBlob + +        try: +            unpacked_data = PACMAN.unpack(preamble) +            magic, sch, meth, ts, iv, doc_id, rev = unpacked_data +        except struct.error: +            raise InvalidBlob + +        if magic != BLOB_SIGNATURE_MAGIC: +            raise InvalidBlob +        # TODO check timestamp +        if sch != ENC_SCHEME.symkey: +            raise InvalidBlob('invalid scheme') +        if meth != ENC_METHOD.aes_256_gcm: +            raise InvalidBlob('invalid encryption scheme') +        if rev != self.rev: +            raise InvalidBlob('invalid revision') +        if doc_id != self.doc_id: +            raise InvalidBlob('invalid revision') +        return BytesIO(ciphertext), preamble, iv + +    def _end_stream(self): +        try: +            return self._aes.end()[1] +        except InvalidTag: +            raise InvalidBlob('Invalid Tag. Blob authentication failed.') + +    def decrypt(self): +        """ +        Starts producing encrypted data from the cleartext data. + +        :return: A deferred which will be fired when encryption ends and whose +            callback will be invoked with the resulting ciphertext. +        :rtype: twisted.internet.defer.Deferred +        """ +        d = self._producer.startProducing(self._aes) +        d.addCallback(lambda _: self._end_stream()) +        return d + + +class AESWriter(object): +    """ +    A Twisted's Consumer implementation that takes an input file descriptor and +    applies AES-256 cipher in GCM mode. +    """ +    implements(interfaces.IConsumer) + +    def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): +        if len(key) != 32: +            raise EncryptionDecryptionError('key is not 256 bits') +        self.iv = iv or os.urandom(16) +        self.buffer = _buffer or BytesIO() +        cipher = _get_aes_cipher(key, self.iv, tag, mode) +        cipher = cipher.decryptor() if tag else cipher.encryptor() +        self.cipher, self.aead = cipher, '' + +    def authenticate(self, data): +        self.aead += data +        self.cipher.authenticate_additional_data(data) + +    @property +    def tag(self): +        return getattr(self.cipher, 'tag', None) + +    def write(self, data): +        self.buffer.write(self.cipher.update(data)) + +    def end(self): +        self.buffer.write(self.cipher.finalize()) +        return self.aead, self.buffer.getvalue() + + +def is_symmetrically_encrypted(content): +    """ +    Returns True if the document was symmetrically encrypted. +    'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically +    encrypted value for enc_scheme flag). + +    :param doc: The document content as string +    :type doc: str + +    :rtype: bool +    """ +    return content and content[:13] == '{"raw": "EzcB' + + +# utils + + +def _hmac_sha256(key, data): +    return hmac.new(key, data, hashlib.sha256).digest() + + +def _get_sym_key_for_doc(doc_id, secret): +    key = secret[SECRET_LENGTH:] +    return _hmac_sha256(key, doc_id) + + +def _get_aes_cipher(key, iv, tag, mode=modes.GCM): +    mode = mode(iv, tag) if mode == modes.GCM else mode(iv) +    return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) + + +def _split(base64_raw_payload): +    return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload)) + + +def _mode_by_method(method): +    if method == ENC_METHOD.aes_256_gcm: +        return modes.GCM +    else: +        return modes.CTR diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index ce9bec05..a5328d2b 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -50,8 +50,7 @@ How many times a SQLCipher query should be retried in case of timeout.  SQLCIPHER_MAX_RETRIES = 10 -def getConnectionPool(opts, openfun=None, driver="pysqlcipher", -                      sync_enc_pool=None): +def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):      """      Return a connection pool. @@ -72,7 +71,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher",      if openfun is None and driver == "pysqlcipher":          openfun = partial(set_init_pragmas, opts=opts)      return U1DBConnectionPool( -        opts, sync_enc_pool, +        opts,          # the following params are relayed "as is" to twisted's          # ConnectionPool.          "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, @@ -89,7 +88,7 @@ class U1DBConnection(adbapi.Connection):      The U1DB wrapper to use.      """ -    def __init__(self, pool, sync_enc_pool, init_u1db=False): +    def __init__(self, pool, init_u1db=False):          """          :param pool: The pool of connections to that owns this connection.          :type pool: adbapi.ConnectionPool @@ -97,7 +96,6 @@ class U1DBConnection(adbapi.Connection):          :type init_u1db: bool          """          self.init_u1db = init_u1db -        self._sync_enc_pool = sync_enc_pool          try:              adbapi.Connection.__init__(self, pool)          except dbapi2.DatabaseError as e: @@ -116,8 +114,7 @@ class U1DBConnection(adbapi.Connection):          if self.init_u1db:              self._u1db = self.u1db_wrapper(                  self._connection, -                self._pool.opts, -                self._sync_enc_pool) +                self._pool.opts)      def __getattr__(self, name):          """ @@ -162,12 +159,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool):      connectionFactory = U1DBConnection      transactionFactory = U1DBTransaction -    def __init__(self, opts, sync_enc_pool, *args, **kwargs): +    def __init__(self, opts, *args, **kwargs):          """          Initialize the connection pool.          """          self.opts = opts -        self._sync_enc_pool = sync_enc_pool          try:              adbapi.ConnectionPool.__init__(self, *args, **kwargs)          except dbapi2.DatabaseError as e: @@ -182,7 +178,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool):          try:              conn = self.connectionFactory( -                self, self._sync_enc_pool, init_u1db=True) +                self, init_u1db=True)              replica_uid = conn._u1db._real_replica_uid              setProxiedObject(self.replica_uid, replica_uid)          except DatabaseAccessError as e: @@ -257,7 +253,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool):          tid = self.threadID()          u1db = self._u1dbconnections.get(tid)          conn = self.connectionFactory( -            self, self._sync_enc_pool, init_u1db=not bool(u1db)) +            self, init_u1db=not bool(u1db))          if self.replica_uid is None:              replica_uid = conn._u1db._real_replica_uid diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..da6eec66 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -56,11 +56,10 @@ from leap.soledad.common.errors import DatabaseAccessError  from leap.soledad.client import adbapi  from leap.soledad.client import events as soledad_events  from leap.soledad.client import interfaces as soledad_interfaces -from leap.soledad.client.crypto import SoledadCrypto +from leap.soledad.client import sqlcipher  from leap.soledad.client.secrets import SoledadSecrets  from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.client import sqlcipher -from leap.soledad.client import encdecpool +from leap.soledad.client._crypto import SoledadCrypto  logger = getLogger(__name__) @@ -131,7 +130,7 @@ class Soledad(object):      def __init__(self, uuid, passphrase, secrets_path, local_db_path,                   server_url, cert_file, shared_db=None, -                 auth_token=None, defer_encryption=False, syncable=True): +                 auth_token=None, syncable=True):          """          Initialize configuration, cryptographic keys and dbs. @@ -168,11 +167,6 @@ class Soledad(object):              Authorization token for accessing remote databases.          :type auth_token: str -        :param defer_encryption: -            Whether to defer encryption/decryption of documents, or do it -            inline while syncing. -        :type defer_encryption: bool -          :param syncable:              If set to ``False``, this database will not attempt to synchronize              with remote replicas (default is ``True``) @@ -188,9 +182,7 @@ class Soledad(object):          self._passphrase = passphrase          self._local_db_path = local_db_path          self._server_url = server_url -        self._defer_encryption = defer_encryption          self._secrets_path = None -        self._sync_enc_pool = None          self._dbsyncer = None          self.shared_db = shared_db @@ -226,7 +218,6 @@ class Soledad(object):              # have to close any thread-related stuff we have already opened              # here, otherwise there might be zombie threads that may clog the              # reactor. -            self._sync_db.close()              if hasattr(self, '_dbpool'):                  self._dbpool.close()              raise @@ -289,22 +280,12 @@ class Soledad(object):          tohex = binascii.b2a_hex          # sqlcipher only accepts the hex version          key = tohex(self._secrets.get_local_storage_key()) -        sync_db_key = tohex(self._secrets.get_sync_db_key())          opts = sqlcipher.SQLCipherOptions(              self._local_db_path, key, -            is_raw_key=True, create=True, -            defer_encryption=self._defer_encryption, -            sync_db_key=sync_db_key, -        ) +            is_raw_key=True, create=True)          self._sqlcipher_opts = opts - -        # the sync_db is used both for deferred encryption and decryption, so -        # we want to initialize it anyway to allow for all combinations of -        # deferred encryption and decryption configurations. -        self._initialize_sync_db(opts) -        self._dbpool = adbapi.getConnectionPool( -            opts, sync_enc_pool=self._sync_enc_pool) +        self._dbpool = adbapi.getConnectionPool(opts)      def _init_u1db_syncer(self):          """ @@ -313,10 +294,7 @@ class Soledad(object):          replica_uid = self._dbpool.replica_uid          self._dbsyncer = sqlcipher.SQLCipherU1DBSync(              self._sqlcipher_opts, self._crypto, replica_uid, -            SOLEDAD_CERT, -            defer_encryption=self._defer_encryption, -            sync_db=self._sync_db, -            sync_enc_pool=self._sync_enc_pool) +            SOLEDAD_CERT)      def sync_stats(self):          sync_phase = 0 @@ -341,12 +319,6 @@ class Soledad(object):          self._dbpool.close()          if getattr(self, '_dbsyncer', None):              self._dbsyncer.close() -        # close the sync database -        if self._sync_db: -            self._sync_db.close() -        self._sync_db = None -        if self._defer_encryption: -            self._sync_enc_pool.stop()      #      # ILocalStorage @@ -385,7 +357,8 @@ class Soledad(object):              also be updated.          :rtype: twisted.internet.defer.Deferred          """ -        return self._defer("put_doc", doc) +        d = self._defer("put_doc", doc) +        return d      def delete_doc(self, doc):          """ @@ -479,7 +452,8 @@ class Soledad(object):          # create_doc (and probably to put_doc too). There are cases (mail          # payloads for example) in which we already have the encoding in the          # headers, so we don't need to guess it. -        return self._defer("create_doc", content, doc_id=doc_id) +        d = self._defer("create_doc", content, doc_id=doc_id) +        return d      def create_doc_from_json(self, json, doc_id=None):          """ @@ -700,37 +674,26 @@ class Soledad(object):          if syncable and not self._dbsyncer:              self._init_u1db_syncer() -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize documents with the server replica.          This method uses a lock to prevent multiple concurrent sync processes          over the same local db file. -        :param defer_decryption: -            Whether to defer decryption of documents, or do it inline while -            syncing. -        :type defer_decryption: bool -          :return: A deferred lock that will run the actual sync process when                   the lock is acquired, and which will fire with with the local                   generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred          """          d = self.sync_lock.run( -            self._sync, -            defer_decryption) +            self._sync)          return d -    def _sync(self, defer_decryption): +    def _sync(self):          """          Synchronize documents with the server replica. -        :param defer_decryption: -            Whether to defer decryption of documents, or do it inline while -            syncing. -        :type defer_decryption: bool -          :return: A deferred whose callback will be invoked with the local              generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred @@ -740,8 +703,7 @@ class Soledad(object):              return          d = self._dbsyncer.sync(              sync_url, -            creds=self._creds, -            defer_decryption=defer_decryption) +            creds=self._creds)          def _sync_callback(local_gen):              self._last_received_docs = docs = self._dbsyncer.received_docs @@ -837,50 +799,6 @@ class Soledad(object):      token = property(_get_token, _set_token, doc='The authentication Token.') -    def _initialize_sync_db(self, opts): -        """ -        Initialize the Symmetrically-Encrypted document to be synced database, -        and the queue to communicate with subprocess workers. - -        :param opts: -        :type opts: SQLCipherOptions -        """ -        soledad_assert(opts.sync_db_key is not None) -        sync_db_path = None -        if opts.path != ":memory:": -            sync_db_path = "%s-sync" % opts.path -        else: -            sync_db_path = ":memory:" - -        # we copy incoming options because the opts object might be used -        # somewhere else -        sync_opts = sqlcipher.SQLCipherOptions.copy( -            opts, path=sync_db_path, create=True) -        self._sync_db = sqlcipher.getConnectionPool( -            sync_opts, extra_queries=self._sync_db_extra_init) -        if self._defer_encryption: -            # initialize syncing queue encryption pool -            self._sync_enc_pool = encdecpool.SyncEncrypterPool( -                self._crypto, self._sync_db) -            self._sync_enc_pool.start() - -    @property -    def _sync_db_extra_init(self): -        """ -        Queries for creating tables for the local sync documents db if needed. -        They are passed as extra initialization to initialize_sqlciphjer_db - -        :rtype: tuple of strings -        """ -        maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" -        encr = encdecpool.SyncEncrypterPool -        decr = encdecpool.SyncDecrypterPool -        sql_encr_table_query = (maybe_create % ( -            encr.TABLE_NAME, encr.FIELD_NAMES)) -        sql_decr_table_query = (maybe_create % ( -            decr.TABLE_NAME, decr.FIELD_NAMES)) -        return (sql_encr_table_query, sql_decr_table_query) -      #      # ISecretsStorage      # @@ -1017,6 +935,7 @@ def create_path_if_not_exists(path):  # Monkey patching u1db to be able to provide a custom SSL cert  # ---------------------------------------------------------------------------- +  # We need a more reasonable timeout (in seconds)  SOLEDAD_TIMEOUT = 120 diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index d81c883b..09e90171 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -32,9 +32,12 @@ from leap.soledad.common import soledad_assert  from leap.soledad.common import soledad_assert_type  from leap.soledad.common import crypto  from leap.soledad.common.log import getLogger +import warnings  logger = getLogger(__name__) +warnings.warn("'soledad.client.crypto' MODULE DEPRECATED", +              DeprecationWarning, stacklevel=2)  MAC_KEY_LENGTH = 64 diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py deleted file mode 100644 index 056b012f..00000000 --- a/client/src/leap/soledad/client/encdecpool.py +++ /dev/null @@ -1,657 +0,0 @@ -# -*- coding: utf-8 -*- -# encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A pool of encryption/decryption concurrent and parallel workers for using -during synchronization. -""" - - -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall -from twisted.internet import threads -from twisted.internet import defer - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common import soledad_assert -from leap.soledad.common.log import getLogger - -from leap.soledad.client.crypto import encrypt_docstr -from leap.soledad.client.crypto import decrypt_doc_dict - - -logger = getLogger(__name__) - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): -    """ -    Base class for encrypter/decrypter pools. -    """ - -    def __init__(self, crypto, sync_db): -        """ -        Initialize the pool of encryption-workers. - -        :param crypto: A SoledadCryto instance to perform the encryption. -        :type crypto: leap.soledad.crypto.SoledadCrypto - -        :param sync_db: A database connection handle -        :type sync_db: pysqlcipher.dbapi2.Connection -        """ -        self._crypto = crypto -        self._sync_db = sync_db -        self._delayed_call = None -        self._started = False - -    def start(self): -        self._started = True - -    def stop(self): -        self._started = False -        # maybe cancel the next delayed call -        if self._delayed_call \ -                and not self._delayed_call.called: -            self._delayed_call.cancel() - -    @property -    def running(self): -        return self._started - -    def _runOperation(self, query, *args): -        """ -        Run an operation on the sync db. - -        :param query: The query to be executed. -        :type query: str -        :param args: A list of query arguments. -        :type args: list - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        return self._sync_db.runOperation(query, *args) - -    def _runQuery(self, query, *args): -        """ -        Run a query on the sync db. - -        :param query: The query to be executed. -        :type query: str -        :param args: A list of query arguments. -        :type args: list - -        :return: A deferred that will fire with the results of the database -                 query. -        :rtype: twisted.internet.defer.Deferred -        """ -        return self._sync_db.runQuery(query, *args) - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): -    """ -    Encrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The serialized content of the document. -    :type content: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    encrypted_content = encrypt_docstr( -        content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric encryption -    of documents to be synced. -    """ -    TABLE_NAME = "docs_tosync" -    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - -    ENCRYPT_LOOP_PERIOD = 2 - -    def __init__(self, *args, **kwargs): -        """ -        Initialize the sync encrypter pool. -        """ -        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) -        # TODO delete already synced files from database - -    def start(self): -        """ -        Start the encrypter pool. -        """ -        SyncEncryptDecryptPool.start(self) -        logger.debug("starting the encryption loop...") - -    def stop(self): -        """ -        Stop the encrypter pool. -        """ - -        SyncEncryptDecryptPool.stop(self) - -    def encrypt_doc(self, doc): -        """ -        Encrypt document asynchronously then insert it on -        local staging database. - -        :param doc: The document to be encrypted. -        :type doc: SoledadDocument -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") -        docstr = doc.get_json() -        key = self._crypto.doc_passphrase(doc.doc_id) -        secret = self._crypto.secret -        args = doc.doc_id, doc.rev, docstr, key, secret -        # encrypt asynchronously -        # TODO use dedicated threadpool / move to ampoule -        d = threads.deferToThread( -            encrypt_doc_task, *args) -        d.addCallback(self._encrypt_doc_cb) -        return d - -    def _encrypt_doc_cb(self, result): -        """ -        Insert results of encryption routine into the local sync database. - -        :param result: A tuple containing the doc id, revision and encrypted -                       content. -        :type result: tuple(str, str, str) -        """ -        doc_id, doc_rev, content = result -        return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - -    def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): -        """ -        Insert the contents of the encrypted doc into the local sync -        database. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        """ -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ -                % (self.TABLE_NAME,) -        return self._runOperation(query, (doc_id, doc_rev, content)) - -    @defer.inlineCallbacks -    def get_encrypted_doc(self, doc_id, doc_rev): -        """ -        Get an encrypted document from the sync db. - -        :param doc_id: The id of the document. -        :type doc_id: str -        :param doc_rev: The revision of the document. -        :type doc_rev: str - -        :return: A deferred that will fire with the encrypted content of the -                 document or None if the document was not found in the sync -                 db. -        :rtype: twisted.internet.defer.Deferred -        """ -        query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ -                % self.TABLE_NAME -        result = yield self._runQuery(query, (doc_id, doc_rev)) -        if result: -            logger.debug("found doc on sync db: %s" % doc_id) -            val = result.pop() -            defer.returnValue(val[0]) -        logger.debug("did not find doc on sync db: %s" % doc_id) -        defer.returnValue(None) - -    def delete_encrypted_doc(self, doc_id, doc_rev): -        """ -        Delete an encrypted document from the sync db. - -        :param doc_id: The id of the document. -        :type doc_id: str -        :param doc_rev: The revision of the document. -        :type doc_rev: str - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ -                % self.TABLE_NAME -        self._runOperation(query, (doc_id, doc_rev)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, -                     idx): -    """ -    Decrypt the content of the given document. - -    :param doc_id: The document id. -    :type doc_id: str -    :param doc_rev: The document revision. -    :type doc_rev: str -    :param content: The encrypted content of the document as JSON dict. -    :type content: dict -    :param gen: The generation corresponding to the modification of that -                document. -    :type gen: int -    :param trans_id: The transaction id corresponding to the modification of -                     that document. -    :type trans_id: str -    :param key: The encryption key. -    :type key: str -    :param secret: The Soledad storage secret (used for MAC auth). -    :type secret: str -    :param idx: The index of this document in the current sync process. -    :type idx: int - -    :return: A tuple containing the doc id, revision and encrypted content. -    :rtype: tuple(str, str, str) -    """ -    decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) -    return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): -    """ -    Pool of workers that spawn subprocesses to execute the symmetric decryption -    of documents that were received. - -    The decryption of the received documents is done in two steps: - -        1. Encrypted documents are stored in the sync db by the actual soledad -           sync loop. -        2. The soledad sync loop tells us how many documents we should expect -           to process. -        3. We start a decrypt-and-process loop: - -            a. Encrypted documents are fetched. -            b. Encrypted documents are decrypted. -            c. The longest possible list of decrypted documents are inserted -               in the soledad db (this depends on which documents have already -               arrived and which documents have already been decrypte, because -               the order of insertion in the local soledad db matters). -            d. Processed documents are deleted from the database. - -        4. When we have processed as many documents as we should, the loop -           finishes. -    """ -    TABLE_NAME = "docs_received" -    FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ -                  "trans_id, encrypted, idx, sync_id" - -    """ -    Period of recurrence of the periodic decrypting task, in seconds. -    """ -    DECRYPT_LOOP_PERIOD = 0.5 - -    def __init__(self, *args, **kwargs): -        """ -        Initialize the decrypter pool, and setup a dict for putting the -        results of the decrypted docs until they are picked by the insert -        routine that gets them in order. - -        :param insert_doc_cb: A callback for inserting received documents from -                              target. If not overriden, this will call u1db -                              insert_doc_from_target in synchronizer, which -                              implements the TAKE OTHER semantics. -        :type insert_doc_cb: function -        :param source_replica_uid: The source replica uid, used to find the -                                   correct callback for inserting documents. -        :type source_replica_uid: str -        """ -        self._insert_doc_cb = kwargs.pop("insert_doc_cb") -        self.source_replica_uid = kwargs.pop("source_replica_uid") - -        SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - -        self._docs_to_process = None -        self._processed_docs = 0 -        self._last_inserted_idx = 0 - -        self._loop = LoopingCall(self._decrypt_and_recurse) - -    def _start_pool(self, period): -        self._loop.start(period) - -    def start(self, docs_to_process): -        """ -        Set the number of documents we expect to process. - -        This should be called by the during the sync exchange process as soon -        as we know how many documents are arriving from the server. - -        :param docs_to_process: The number of documents to process. -        :type docs_to_process: int -        """ -        SyncEncryptDecryptPool.start(self) -        self._decrypted_docs_indexes = set() -        self._sync_id = uuid4().hex -        self._docs_to_process = docs_to_process -        self._deferred = defer.Deferred() -        d = self._init_db() -        d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) -        return d - -    def stop(self): -        if self._loop.running: -            self._loop.stop() -        self._finish() -        SyncEncryptDecryptPool.stop(self) - -    def _init_db(self): -        """ -        Ensure sync_id column is present then -        Empty the received docs table of the sync database. - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % -                                 self.TABLE_NAME) -        d = self._runQuery(ensure_sync_id_column) - -        def empty_received_docs(_): -            query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) -            return self._runOperation(query, (self._sync_id,)) - -        d.addCallbacks(empty_received_docs, empty_received_docs) -        return d - -    def _errback(self, failure): -        logger.error(failure) -        self._deferred.errback(failure) -        self._processed_docs = 0 -        self._last_inserted_idx = 0 - -    @property -    def deferred(self): -        """ -        Deferred that will be fired when the decryption loop has finished -        processing all the documents. -        """ -        return self._deferred - -    def insert_encrypted_received_doc( -            self, doc_id, doc_rev, content, gen, trans_id, idx): -        """ -        Decrypt and insert a received document into local staging area to be -        processed later on. - -        :param doc_id: The document ID. -        :type doc_id: str -        :param doc_rev: The document Revision -        :param doc_rev: str -        :param content: The content of the document -        :type content: dict -        :param gen: The document Generation -        :type gen: int -        :param trans_id: Transaction ID -        :type trans_id: str -        :param idx: The index of this document in the current sync process. -        :type idx: int - -        :return: A deferred that will fire after the decrypted document has -                 been inserted in the sync db. -        :rtype: twisted.internet.defer.Deferred -        """ -        soledad_assert(self._crypto is not None, "need a crypto object") - -        key = self._crypto.doc_passphrase(doc_id) -        secret = self._crypto.secret -        args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx -        # decrypt asynchronously -        # TODO use dedicated threadpool / move to ampoule -        d = threads.deferToThread( -            decrypt_doc_task, *args) -        # callback will insert it for later processing -        d.addCallback(self._decrypt_doc_cb) -        return d - -    def insert_received_doc( -            self, doc_id, doc_rev, content, gen, trans_id, idx): -        """ -        Insert a document that is not symmetrically encrypted. -        We store it in the staging area (the decrypted_docs dictionary) to be -        picked up in order as the preceding documents are decrypted. - -        :param doc_id: The document id -        :type doc_id: str -        :param doc_rev: The document revision -        :param doc_rev: str or dict -        :param content: The content of the document -        :type content: dict -        :param gen: The document generation -        :type gen: int -        :param trans_id: The transaction id -        :type trans_id: str -        :param idx: The index of this document in the current sync process. -        :type idx: int - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        if not isinstance(content, str): -            content = json.dumps(content) -        query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ -                % self.TABLE_NAME -        d = self._runOperation( -            query, (doc_id, doc_rev, content, gen, trans_id, 0, -                    idx, self._sync_id)) -        d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) -        return d - -    def _delete_received_docs(self, doc_ids): -        """ -        Delete a list of received docs after get them inserted into the db. - -        :param doc_id: Document ID list. -        :type doc_id: list - -        :return: A deferred that will fire when the operation in the database -                 has finished. -        :rtype: twisted.internet.defer.Deferred -        """ -        placeholders = ', '.join('?' for _ in doc_ids) -        query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ -                % (self.TABLE_NAME, placeholders) -        return self._runOperation(query, (doc_ids)) - -    def _decrypt_doc_cb(self, result): -        """ -        Store the decryption result in the sync db from where it will later be -        picked by _process_decrypted_docs. - -        :param result: A tuple containing the document's id, revision, -                       content, generation, transaction id and sync index. -        :type result: tuple(str, str, str, int, str, int) - -        :return: A deferred that will fire after the document has been -                 inserted in the sync db. -        :rtype: twisted.internet.defer.Deferred -        """ -        doc_id, rev, content, gen, trans_id, idx = result -        logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" -                     % (doc_id, rev, gen, trans_id)) -        return self.insert_received_doc( -            doc_id, rev, content, gen, trans_id, idx) - -    def _get_docs(self, encrypted=None, sequence=None): -        """ -        Get documents from the received docs table in the sync db. - -        :param encrypted: If not None, only return documents with encrypted -                          field equal to given parameter. -        :type encrypted: bool or None -        :param order_by: The name of the field to order results. - -        :return: A deferred that will fire with the results of the database -                 query. -        :rtype: twisted.internet.defer.Deferred -        """ -        query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ -                "idx FROM %s" % self.TABLE_NAME -        parameters = [] -        if encrypted or sequence: -            query += " WHERE sync_id = ? and" -            parameters += [self._sync_id] -        if encrypted: -            query += " encrypted = ?" -            parameters += [int(encrypted)] -        if sequence: -            query += " idx in (" + ', '.join('?' * len(sequence)) + ")" -            parameters += [int(i) for i in sequence] -        query += " ORDER BY idx ASC" -        return self._runQuery(query, parameters) - -    @defer.inlineCallbacks -    def _get_insertable_docs(self): -        """ -        Return a list of non-encrypted documents ready to be inserted. - -        :return: A deferred that will fire with the list of insertable -                 documents. -        :rtype: twisted.internet.defer.Deferred -        """ -        # Here, check in memory what are the insertable indexes that can -        # form a sequence starting from the last inserted index -        sequence = [] -        insertable_docs = [] -        next_index = self._last_inserted_idx + 1 -        while next_index in self._decrypted_docs_indexes: -            sequence.append(str(next_index)) -            next_index += 1 -            if len(sequence) > 900: -                # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER -                # if we try to query more, SQLite will refuse -                # we need to find a way to improve this -                # being researched in #7669 -                break -        # Then fetch all the ones ready for insertion. -        if sequence: -            insertable_docs = yield self._get_docs(encrypted=False, -                                                   sequence=sequence) -        defer.returnValue(insertable_docs) - -    @defer.inlineCallbacks -    def _process_decrypted_docs(self): -        """ -        Fetch as many decrypted documents as can be taken from the expected -        order and insert them in the local replica. - -        :return: A deferred that will fire with the list of inserted -                 documents. -        :rtype: twisted.internet.defer.Deferred -        """ -        insertable = yield self._get_insertable_docs() -        processed_docs_ids = [] -        for doc_fields in insertable: -            method = self._insert_decrypted_local_doc -            # FIXME: This is used only because SQLCipherU1DBSync is synchronous -            # When adbapi is used there is no need for an external thread -            # Without this the reactor can freeze and fail docs download -            yield threads.deferToThread(method, *doc_fields) -            processed_docs_ids.append(doc_fields[0]) -        yield self._delete_received_docs(processed_docs_ids) - -    def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, -                                    gen, trans_id, encrypted, idx): -        """ -        Insert the decrypted document into the local replica. - -        Make use of the passed callback `insert_doc_cb` passed to the caller -        by u1db sync. - -        :param doc_id: The document id. -        :type doc_id: str -        :param doc_rev: The document revision. -        :type doc_rev: str -        :param content: The serialized content of the document. -        :type content: str -        :param gen: The generation corresponding to the modification of that -                    document. -        :type gen: int -        :param trans_id: The transaction id corresponding to the modification -                         of that document. -        :type trans_id: str -        """ -        # could pass source_replica in params for callback chain -        logger.debug("sync decrypter pool: inserting doc in local db: " -                     "%s:%s %s" % (doc_id, doc_rev, gen)) - -        # convert deleted documents to avoid error on document creation -        if content == 'null': -            content = None -        doc = SoledadDocument(doc_id, doc_rev, content) -        gen = int(gen) -        self._insert_doc_cb(doc, gen, trans_id) - -        # store info about processed docs -        self._last_inserted_idx = idx -        self._processed_docs += 1 - -    @defer.inlineCallbacks -    def _decrypt_and_recurse(self): -        """ -        Decrypt the documents received from remote replica and insert them -        into the local one. - -        This method implicitelly returns a defferred (see the decorator -        above). It should only be called by _launch_decrypt_and_process(). -        because this way any exceptions raised here will be stored by the -        errback attached to the deferred returned. - -        :return: A deferred which will fire after all decrypt, process and -                 delete operations have been executed. -        :rtype: twisted.internet.defer.Deferred -        """ -        if not self.running: -            defer.returnValue(None) -        processed = self._processed_docs -        pending = self._docs_to_process - -        if processed < pending: -            yield self._process_decrypted_docs() -        else: -            self._finish() - -    def _finish(self): -        self._processed_docs = 0 -        self._last_inserted_idx = 0 -        self._decrypted_docs_indexes = set() -        if not self._deferred.called: -            self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py index 4fc91d9d..92bc85d6 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -58,6 +58,7 @@ def debug(*args):      if not silent:          print(*args) +  debug("[+] db path:", tmpdb)  debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts)  def createDoc(doc):      return dbpool.runU1DBQuery("create_doc", doc) +  db_indexes = {      'by-chash': ['chash'],      'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_):          deferreds.append(d)      return defer.gatherResults(deferreds, consumeErrors=True) +  d = create_indexes(None)  d.addCallback(insert_docs)  d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py index 38ea18a3..429566c7 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -58,6 +58,7 @@ def debug(*args):      if not silent:          print(*args) +  debug("[+] db path:", tmpdb)  debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts)  def createDoc(doc, doc_id):      return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id) +  db_indexes = {      'by-chash': ['chash'],      'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_):          deferreds.append(d)      return defer.gatherResults(deferreds, consumeErrors=True) +  d = create_indexes(None)  d.addCallback(insert_docs)  d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py index 61621e89..ddedf433 100644 --- a/client/src/leap/soledad/client/examples/run_benchmark.py +++ b/client/src/leap/soledad/client/examples/run_benchmark.py @@ -14,6 +14,7 @@ cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py"  def parse_time(r):      return r.split('\n')[-1] +  with open(CSVFILE, 'w') as log:      for times in range(0, 10000, 500): diff --git a/client/src/leap/soledad/client/examples/soledad_sync.py b/client/src/leap/soledad/client/examples/soledad_sync.py index 63077ee3..3aed10eb 100644 --- a/client/src/leap/soledad/client/examples/soledad_sync.py +++ b/client/src/leap/soledad/client/examples/soledad_sync.py @@ -40,7 +40,7 @@ def init_soledad(_):      global soledad      soledad = Soledad(uuid, _pass, secrets_path, local_db_path,                        server_url, cert_file, -                      auth_token=token, defer_encryption=False) +                      auth_token=token)      def getall(_):          d = soledad.get_all_docs() diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py index a2683836..39301b41 100644 --- a/client/src/leap/soledad/client/examples/use_adbapi.py +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -39,6 +39,7 @@ def debug(*args):      if not silent:          print(*args) +  debug("[+] db path:", tmpdb)  debug("[+] times", times) @@ -87,6 +88,7 @@ def allDone(_):          print((end_time - start_time).total_seconds())      reactor.stop() +  deferreds = []  payload = open('manifest.phk').read() diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py index e2501c98..db77c4b3 100644 --- a/client/src/leap/soledad/client/examples/use_api.py +++ b/client/src/leap/soledad/client/examples/use_api.py @@ -36,6 +36,7 @@ def debug(*args):      if not silent:          print(*args) +  debug("[+] db path:", tmpdb)  debug("[+] times", times) @@ -52,6 +53,7 @@ db = sqlcipher.SQLCipherDatabase(opts)  def allDone():      debug("ALL DONE!") +  payload = open('manifest.phk').read()  for i in range(times): diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 62e8bcf0..0e250bf1 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -25,10 +25,13 @@ after receiving.  import os  from leap.soledad.common.log import getLogger -from leap.common.http import HTTPClient +from leap.common.certs import get_compatible_ssl_context_factory +from twisted.web.client import Agent +from twisted.internet import reactor  from leap.soledad.client.http_target.send import HTTPDocSender  from leap.soledad.client.http_target.api import SyncTargetAPI  from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto  logger = getLogger(__name__) @@ -51,8 +54,7 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):      the parsed documents that the remote send us, before being decrypted and      written to the main database.      """ -    def __init__(self, url, source_replica_uid, creds, crypto, cert_file, -                 sync_db=None, sync_enc_pool=None): +    def __init__(self, url, source_replica_uid, creds, crypto, cert_file):          """          Initialize the sync target. @@ -65,21 +67,11 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):          :type creds: creds          :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt                          document contents when syncing. -        :type crypto: soledad.crypto.SoledadCrypto +        :type crypto: soledad._crypto.SoledadCrypto          :param cert_file: Path to the certificate of the ca used to validate                            the SSL certificate used by the remote soledad                            server.          :type cert_file: str -        :param sync_db: Optional. handler for the db with the symmetric -                        encryption of the syncing documents. If -                        None, encryption will be done in-place, -                        instead of retreiving it from the dedicated -                        database. -        :type sync_db: Sqlite handler -        :param sync_enc_pool: The encryption pool to use to defer encryption. -                              If None is passed the encryption will not be -                              deferred. -        :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool          """          if url.endswith("/"):              url = url[:-1] @@ -89,17 +81,13 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):          self._uuid = None          self.set_creds(creds)          self._crypto = crypto -        self._sync_db = sync_db -        self._sync_enc_pool = sync_enc_pool +        # TODO: DEPRECATED CRYPTO +        self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret)          self._insert_doc_cb = None -        # asynchronous encryption/decryption attributes -        self._decryption_callback = None -        self._sync_decr_pool = None - -        # XXX Increasing timeout of simple requests to avoid chances of hitting -        # the duplicated syncing bug. This could be reduced to the 30s default -        # after implementing Cancellable Sync. See #7382 -        self._http = HTTPClient(cert_file, timeout=90) + +        # Twisted default Agent with our own ssl context factory +        self._http = Agent(reactor, +                           get_compatible_ssl_context_factory(cert_file))          if DO_STATS:              self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..1b086a00 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -18,10 +18,13 @@ import os  import json  import base64 +from StringIO import StringIO  from uuid import uuid4  from twisted.web.error import Error  from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer  from leap.soledad.client.http_target.support import readBody  from leap.soledad.common.errors import InvalidAuthTokenError @@ -39,14 +42,6 @@ class SyncTargetAPI(SyncTarget):      Declares public methods and implements u1db.SyncTarget.      """ -    @defer.inlineCallbacks -    def close(self): -        if self._sync_enc_pool: -            self._sync_enc_pool.stop() -        if self._sync_decr_pool: -            self._sync_decr_pool.stop() -        yield self._http.close() -      @property      def uuid(self):          return self._uuid @@ -69,16 +64,20 @@ class SyncTargetAPI(SyncTarget):      def _base_header(self):          return self._auth_header.copy() if self._auth_header else {} -    @property -    def _defer_encryption(self): -        return self._sync_enc_pool is not None -      def _http_request(self, url, method='GET', body=None, headers=None, -                      content_type=None): +                      content_type=None, body_reader=readBody, +                      body_producer=None):          headers = headers or self._base_header          if content_type:              headers.update({'content-type': [content_type]}) -        d = self._http.request(url, method, body, headers, readBody) +        if not body_producer and body: +            body = FileBodyProducer(StringIO(body)) +        elif body_producer: +            # Upload case, check send.py +            body = body_producer(body) +        d = self._http.request( +            method, url, headers=Headers(headers), bodyProducer=body) +        d.addCallback(body_reader)          d.addErrback(_unauth_to_invalid_token_error)          return d @@ -153,7 +152,7 @@ class SyncTargetAPI(SyncTarget):      def sync_exchange(self, docs_by_generation, source_replica_uid,                        last_known_generation, last_known_trans_id,                        insert_doc_cb, ensure_callback=None, -                      defer_decryption=True, sync_id=None): +                      sync_id=None):          """          Find out which documents the remote database does not know about,          encrypt and send them. After that, receive documents from the remote @@ -185,11 +184,6 @@ class SyncTargetAPI(SyncTarget):                                  created.          :type ensure_callback: function -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool -          :return: A deferred which fires with the new generation and                   transaction id of the target replica.          :rtype: twisted.internet.defer.Deferred @@ -221,8 +215,7 @@ class SyncTargetAPI(SyncTarget):          cur_target_gen, cur_target_trans_id = yield self._receive_docs(              last_known_generation, last_known_trans_id, -            ensure_callback, sync_id, -            defer_decryption=defer_decryption) +            ensure_callback, sync_id)          # update gen and trans id info in case we just sent and did not          # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 184c5883..8676ceed 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -15,18 +15,19 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>.  import json -  from twisted.internet import defer +from twisted.internet import threads  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS  from leap.soledad.client.events import emit_async -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool  from leap.soledad.client.http_target.support import RequestBody  from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted  from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.l2db import errors -from leap.soledad.common.l2db.remote import utils +from leap.soledad.client import crypto as old_crypto + +from . import fetch_protocol  logger = getLogger(__name__) @@ -50,208 +51,105 @@ class HTTPDocFetcher(object):      @defer.inlineCallbacks      def _receive_docs(self, last_known_generation, last_known_trans_id, -                      ensure_callback, sync_id, defer_decryption): - -        self._queue_for_decrypt = defer_decryption \ -            and self._sync_db is not None - +                      ensure_callback, sync_id):          new_generation = last_known_generation          new_transaction_id = last_known_trans_id - -        if self._queue_for_decrypt: -            logger.debug( -                "Soledad sync: will queue received docs for decrypting.") - -        if defer_decryption: -            self._setup_sync_decr_pool() - -        # --------------------------------------------------------------------- -        # maybe receive the first document -        # --------------------------------------------------------------------- - -        # we fetch the first document before fetching the rest because we need -        # to know the total number of documents to be received, and this -        # information comes as metadata to each request. - -        doc = yield self._receive_one_doc( +        # Acts as a queue, ensuring line order on async processing +        # as `self._insert_doc_cb` cant be run concurrently or out of order. +        # DeferredSemaphore solves the concurrency and its implementation uses +        # a queue, solving the ordering. +        # FIXME: Find a proper solution to avoid surprises on Twisted changes +        self.semaphore = defer.DeferredSemaphore(1) + +        metadata = yield self._fetch_all(              last_known_generation, last_known_trans_id, -            sync_id, 0) -        self._received_docs = 0 -        number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) +            sync_id) +        number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + +        # wait for pending inserts +        yield self.semaphore.acquire()          if ngen:              new_generation = ngen              new_transaction_id = ntrans -        # --------------------------------------------------------------------- -        # maybe receive the rest of the documents -        # --------------------------------------------------------------------- - -        # launch many asynchronous fetches and inserts of received documents -        # in the temporary sync db. Will wait for all results before -        # continuing. - -        received = 1 -        deferreds = [] -        while received < number_of_changes: -            d = self._receive_one_doc( -                last_known_generation, -                last_known_trans_id, sync_id, received) -            d.addCallback( -                self._insert_received_doc, -                received + 1,  # the index of the current received doc -                number_of_changes) -            deferreds.append(d) -            received += 1 -        results = yield defer.gatherResults(deferreds) - -        # get generation and transaction id of target after insertions -        if deferreds: -            _, new_generation, new_transaction_id = results.pop() - -        # --------------------------------------------------------------------- -        # wait for async decryption to finish -        # --------------------------------------------------------------------- - -        if defer_decryption: -            yield self._sync_decr_pool.deferred -            self._sync_decr_pool.stop() -          defer.returnValue([new_generation, new_transaction_id]) -    def _receive_one_doc(self, last_known_generation, -                         last_known_trans_id, sync_id, received): +    def _fetch_all(self, last_known_generation, +                   last_known_trans_id, sync_id):          # add remote replica metadata to the request          body = RequestBody(              last_known_generation=last_known_generation,              last_known_trans_id=last_known_trans_id,              sync_id=sync_id,              ensure=self._ensure_callback is not None) -        # inform server of how many documents have already been received -        body.insert_info(received=received) -        # send headers +        self._received_docs = 0 +        # build a stream reader with _doc_parser as a callback +        body_reader = fetch_protocol.build_body_reader(self._doc_parser) +        # start download stream          return self._http_request(              self._url,              method='POST',              body=str(body), -            content_type='application/x-soledad-sync-get') +            content_type='application/x-soledad-sync-get', +            body_reader=body_reader) -    def _insert_received_doc(self, response, idx, total): +    @defer.inlineCallbacks +    def _doc_parser(self, doc_info, content, total):          """ -        Insert a received document into the local replica. - -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) -        :param idx: The index count of the current operation. -        :type idx: int +        Insert a received document into the local replica, decrypting +        if necessary. The case where it's not decrypted is when a doc gets +        inserted from Server side with a GPG encrypted content. + +        :param doc_info: Dictionary representing Document information. +        :type doc_info: dict +        :param content: The Document's content. +        :type idx: str          :param total: The total number of operations.          :type total: int          """ -        new_generation, new_transaction_id, number_of_changes, doc_id, \ -            rev, content, gen, trans_id = \ -            self._parse_received_doc_response(response) +        yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, +                                 total) -        if self._sync_decr_pool and not self._sync_decr_pool.running: -            self._sync_decr_pool.start(number_of_changes) - -        if doc_id is not None: -            # decrypt incoming document and insert into local database -            # ------------------------------------------------------------- -            # symmetric decryption of document's contents -            # ------------------------------------------------------------- -            # If arriving content was symmetrically encrypted, we decrypt it. -            # We do it inline if defer_decryption flag is False or no sync_db -            # was defined, otherwise we defer it writing it to the received -            # docs table. -            doc = SoledadDocument(doc_id, rev, content) -            if is_symmetrically_encrypted(doc): -                if self._queue_for_decrypt: -                    self._sync_decr_pool.insert_encrypted_received_doc( -                        doc.doc_id, doc.rev, doc.content, gen, trans_id, -                        idx) -                else: -                    # defer_decryption is False or no-sync-db fallback -                    doc.set_json(self._crypto.decrypt_doc(doc)) -                    self._insert_doc_cb(doc, gen, trans_id) -            else: -                # not symmetrically encrypted doc, insert it directly -                # or save it in the decrypted stage. -                if self._queue_for_decrypt: -                    self._sync_decr_pool.insert_received_doc( -                        doc.doc_id, doc.rev, doc.content, gen, trans_id, -                        idx) -                else: -                    self._insert_doc_cb(doc, gen, trans_id) -            # ------------------------------------------------------------- -            # end of symmetric decryption -            # ------------------------------------------------------------- +    @defer.inlineCallbacks +    def __atomic_doc_parse(self, doc_info, content, total): +        doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) +        if is_symmetrically_encrypted(content): +            content = yield self._crypto.decrypt_doc(doc) +        elif old_crypto.is_symmetrically_encrypted(doc): +            content = self._deprecated_crypto.decrypt_doc(doc) +        doc.set_json(content) + +        # TODO insert blobs here on the blob backend +        # FIXME: This is wrong. Using the very same SQLite connection object +        # from multiple threads is dangerous. We should bring the dbpool here +        # or find an alternative.  Deferring to a thread only helps releasing +        # the reactor for other tasks as this is an IO intensive call. +        yield threads.deferToThread(self._insert_doc_cb, +                                    doc, doc_info['gen'], doc_info['trans_id'])          self._received_docs += 1          user_data = {'uuid': self.uuid, 'userid': self.userid} -        _emit_receive_status(user_data, self._received_docs, total) -        return number_of_changes, new_generation, new_transaction_id +        _emit_receive_status(user_data, self._received_docs, total=total) -    def _parse_received_doc_response(self, response): +    def _parse_metadata(self, metadata):          """ -        Parse the response from the server containing the received document. +        Parse the response from the server containing the sync metadata. -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) +        :param response: Metadata as string +        :type response: str -        :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, -                 content, gen, trans_id) +        :return: (number_of_changes, new_gen, new_trans_id)          :rtype: tuple          """ -        # decode incoming stream -        parts = response.splitlines() -        if not parts or parts[0] != '[' or parts[-1] != ']': -            raise errors.BrokenSyncStream -        data = parts[1:-1] -        # decode metadata -        try: -            line, comma = utils.check_and_strip_comma(data[0]) -            metadata = None -        except (IndexError): -            raise errors.BrokenSyncStream          try: -            metadata = json.loads(line) -            new_generation = metadata['new_generation'] -            new_transaction_id = metadata['new_transaction_id'] -            number_of_changes = metadata['number_of_changes'] +            metadata = json.loads(metadata) +            # make sure we have replica_uid from fresh new dbs +            if self._ensure_callback and 'replica_uid' in metadata: +                self._ensure_callback(metadata['replica_uid']) +            return (metadata['number_of_changes'], metadata['new_generation'], +                    metadata['new_transaction_id'])          except (ValueError, KeyError): -            raise errors.BrokenSyncStream -        # make sure we have replica_uid from fresh new dbs -        if self._ensure_callback and 'replica_uid' in metadata: -            self._ensure_callback(metadata['replica_uid']) -        # parse incoming document info -        doc_id = None -        rev = None -        content = None -        gen = None -        trans_id = None -        if number_of_changes > 0: -            try: -                entry = json.loads(data[1]) -                doc_id = entry['id'] -                rev = entry['rev'] -                content = entry['content'] -                gen = entry['gen'] -                trans_id = entry['trans_id'] -            except (IndexError, KeyError): -                raise errors.BrokenSyncStream -        return new_generation, new_transaction_id, number_of_changes, \ -            doc_id, rev, content, gen, trans_id - -    def _setup_sync_decr_pool(self): -        """ -        Set up the SyncDecrypterPool for deferred decryption. -        """ -        if self._sync_decr_pool is None and self._sync_db is not None: -            # initialize syncing queue decryption pool -            self._sync_decr_pool = SyncDecrypterPool( -                self._crypto, -                self._sync_db, -                insert_doc_cb=self._insert_doc_cb, -                source_replica_uid=self.source_replica_uid) +            raise errors.BrokenSyncStream('Metadata parsing failed')  def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..fa6b1969 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from functools import partial +from cStringIO import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): +    """ +    A protocol implementation that can parse incoming data from server based +    on a line format specified on u1db implementation. Except that we split doc +    attributes from content to ease parsing and increment throughput for larger +    documents. +    [\r\n +    {metadata},\r\n +    {doc_info},\r\n +    {content},\r\n +    ... +    {doc_info},\r\n +    {content},\r\n +    ] +    """ + +    def __init__(self, response, deferred, doc_reader): +        self.deferred = deferred +        self.status = response.code if response else None +        self.message = response.phrase if response else None +        self.headers = response.headers if response else {} +        self.delimiter = '\r\n' +        self.metadata = '' +        self._doc_reader = doc_reader +        self.reset() + +    def reset(self): +        self._line = 0 +        self._buffer = StringIO() +        self._properly_finished = False + +    def connectionLost(self, reason): +        """ +        Deliver the accumulated response bytes to the waiting L{Deferred}, if +        the response body has been completely received without error. +        """ +        try: +            if reason.check(ResponseDone): +                self.dataBuffer = self.metadata +            else: +                self.dataBuffer = self.finish() +        except errors.BrokenSyncStream, e: +            return self.deferred.errback(e) +        return ReadBodyProtocol.connectionLost(self, reason) + +    def consumeBufferLines(self): +        """ +        Consumes lines from buffer and rewind it, writing remaining data +        that didn't formed a line back into buffer. +        """ +        content = self._buffer.getvalue()[0:self._buffer.tell()] +        self._buffer.seek(0) +        lines = content.split(self.delimiter) +        self._buffer.write(lines.pop(-1)) +        return lines + +    def dataReceived(self, data): +        """ +        Buffer incoming data until a line breaks comes in. We check only +        the incoming data for efficiency. +        """ +        self._buffer.write(data) +        if '\n' not in data: +            return +        lines = self.consumeBufferLines() +        while lines: +            line, _ = utils.check_and_strip_comma(lines.pop(0)) +            self.lineReceived(line) +            self._line += 1 + +    def lineReceived(self, line): +        """ +        Protocol implementation. +        0:      [\r\n +        1:      {metadata},\r\n +        (even): {doc_info},\r\n +        (odd):  {data},\r\n +        (last): ] +        """ +        if self._properly_finished: +            raise errors.BrokenSyncStream("Reading a finished stream") +        if ']' == line: +            self._properly_finished = True +        elif self._line == 0: +            if line is not '[': +                raise errors.BrokenSyncStream("Invalid start") +        elif self._line == 1: +            self.metadata = line +            if 'error' in self.metadata: +                raise errors.BrokenSyncStream("Error from server: %s" % line) +            self.total = json.loads(line).get('number_of_changes', -1) +        elif (self._line % 2) == 0: +            self.current_doc = json.loads(line) +            if 'error' in self.current_doc: +                raise errors.BrokenSyncStream("Error from server: %s" % line) +        else: +            d = self._doc_reader( +                self.current_doc, line.strip() or None, self.total) +            d.addErrback(self._error) + +    def _error(self, reason): +        logger.error(reason) +        self.transport.loseConnection() + +    def finish(self): +        """ +        Checks that ']' came and stream was properly closed. +        """ +        if not self._properly_finished: +            raise errors.BrokenSyncStream('Stream not properly closed') +        content = self._buffer.getvalue()[0:self._buffer.tell()] +        self._buffer.close() +        return content + + +def build_body_reader(doc_reader): +    """ +    Get the documents from a sync stream and call doc_reader on each +    doc received. + +    @param doc_reader: Function to be called for processing an incoming doc. +        Will be called with doc metadata (dict parsed from 1st line) and doc +        content (string) +    @type doc_reader: function + +    @return: A function that can be called by the http Agent to create and +    configure the proper protocol. +    """ +    protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) +    return partial(readBody, protocolClass=protocolClass) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..2b286ec5 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger  from leap.soledad.client.events import emit_async  from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS  from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer  logger = getLogger(__name__) @@ -32,8 +33,6 @@ class HTTPDocSender(object):      They need to be encrypted and metadata prepared before sending.      """ -    MAX_BATCH_SIZE = 0  # disabled by now, this is being tested yet -      # The uuid of the local replica.      # Any class inheriting from this one should provide a meaningful attribute      # if the sync status event is meant to be used somewhere else. @@ -54,73 +53,50 @@ class HTTPDocSender(object):              last_known_trans_id=last_known_trans_id,              sync_id=sync_id,              ensure=self._ensure_callback is not None) -        total = len(docs_by_generation) -        while body.consumed < total: -            result = yield self._send_batch(total, body, docs_by_generation) +        result = yield self._send_batch(body, docs_by_generation)          response_dict = json.loads(result)[0]          gen_after_send = response_dict['new_generation']          trans_id_after_send = response_dict['new_transaction_id']          defer.returnValue([gen_after_send, trans_id_after_send]) -    def _delete_sent(self, docs): -        for doc, gen, trans_id in docs: -            self._sync_enc_pool.delete_encrypted_doc( -                doc.doc_id, doc.rev) -      @defer.inlineCallbacks -    def _send_batch(self, total, body, docs): -        sent = [] -        missing = total - body.consumed -        for i in xrange(1, missing + 1): -            if body.pending_size > self.MAX_BATCH_SIZE: -                break -            idx = body.consumed + i -            entry = docs[idx - 1] -            sent.append(entry) -            yield self._prepare_one_doc(entry, body, idx, total) -        result = yield self._send_request(body.pop()) -        if self._defer_encryption: -            self._delete_sent(sent) - +    def _send_batch(self, body, docs): +        total, calls = len(docs), [] +        for i, entry in enumerate(docs): +            calls.append((self._prepare_one_doc, +                         entry, body, i + 1, total)) +        result = yield self._send_request(body, calls)          _emit_send_status(self.uuid, body.consumed, total) +          defer.returnValue(result) -    def _send_request(self, body): +    def _send_request(self, body, calls):          return self._http_request(              self._url,              method='POST', -            body=body, -            content_type='application/x-soledad-sync-put') +            body=(body, calls), +            content_type='application/x-soledad-sync-put', +            body_producer=DocStreamProducer)      @defer.inlineCallbacks      def _prepare_one_doc(self, entry, body, idx, total): -        doc, gen, trans_id = entry -        content = yield self._encrypt_doc(doc) +        get_doc_call, gen, trans_id = entry +        doc, content = yield self._encrypt_doc(get_doc_call)          body.insert_info(              id=doc.doc_id, rev=doc.rev, content=content, gen=gen,              trans_id=trans_id, number_of_docs=total,              doc_idx=idx) +        _emit_send_status(self.uuid, body.consumed, total) -    def _encrypt_doc(self, doc): -        d = None +    @defer.inlineCallbacks +    def _encrypt_doc(self, get_doc_call): +        f, args, kwargs = get_doc_call +        doc = yield f(*args, **kwargs)          if doc.is_tombstone(): -            d = defer.succeed(None) -        elif not self._defer_encryption: -            # fallback case, for tests -            d = defer.succeed(self._crypto.encrypt_doc(doc)) +            defer.returnValue((doc, None))          else: - -            def _maybe_encrypt_doc_inline(doc_json): -                if doc_json is None: -                    # the document is not marked as tombstone, but we got -                    # nothing from the sync db. As it is not encrypted -                    # yet, we force inline encryption. -                    return self._crypto.encrypt_doc(doc) -                return doc_json - -            d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) -            d.addCallback(_maybe_encrypt_doc_inline) -        return d +            content = yield self._crypto.encrypt_doc(doc) +            defer.returnValue((doc, content))  def _emit_send_status(user_data, idx, total): diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..0cb6d039 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +class DocStreamProducer(object): +    """ +    A producer that writes the body of a request to a consumer. +    """ + +    implements(IBodyProducer) + +    def __init__(self, producer): +        """ +        Initialize the string produer. + +        :param producer: A RequestBody instance and a list of producer calls +        :type producer: (.support.RequestBody, [(function, *args)]) +        """ +        self.body, self.producer = producer +        self.length = UNKNOWN_LENGTH +        self.pause = False +        self.stop = False + +    @defer.inlineCallbacks +    def startProducing(self, consumer): +        """ +        Write the body to the consumer. + +        :param consumer: Any IConsumer provider. +        :type consumer: twisted.internet.interfaces.IConsumer + +        :return: A Deferred that fires when production ends. +        :rtype: twisted.internet.defer.Deferred +        """ +        while self.producer and not self.stop: +            if self.pause: +                yield self.sleep(0.001) +                continue +            call = self.producer.pop(0) +            fun, args = call[0], call[1:] +            yield fun(*args) +            consumer.write(self.body.pop(1, leave_open=True)) +        consumer.write(self.body.pop(0))  # close stream + +    def sleep(self, secs): +        d = defer.Deferred() +        reactor.callLater(secs, d.callback, None) +        return d + +    def pauseProducing(self): +        self.pause = True + +    def stopProducing(self): +        self.stop = True + +    def resumeProducing(self): +        self.pause = False diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 6ec98ed4..d8d8e420 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -53,6 +53,9 @@ class ReadBodyProtocol(_ReadBodyProtocol):          if exc_cls is not None:              message = respdic.get("message")              self.deferred.errback(exc_cls(message)) +        else: +            self.deferred.errback( +                errors.HTTPError(self.status, respdic, self.headers))      # ---8<--- end of snippet from u1db.remote.http_client      def connectionLost(self, reason): @@ -91,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol):              self.deferred.errback(reason) -def readBody(response): +def readBody(response, protocolClass=ReadBodyProtocol):      """      Get the body of an L{IResponse} and return it as a byte string. @@ -116,7 +119,7 @@ def readBody(response):              abort()      d = defer.Deferred(cancel) -    protocol = ReadBodyProtocol(response, d) +    protocol = protocolClass(response, d)      def getAbort():          return getattr(protocol.transport, 'abortConnection', None) @@ -155,46 +158,49 @@ class RequestBody(object):          self.headers = header_dict          self.entries = []          self.consumed = 0 -        self.pending_size = 0      def insert_info(self, **entry_dict):          """          Dumps an entry into JSON format and add it to entries list. +        Adds 'content' key on a new line if it's present.          :param entry_dict: Entry as a dictionary          :type entry_dict: dict - -        :return: length of the entry after JSON dumps -        :rtype: int          """ -        entry = json.dumps(entry_dict) +        content = '' +        if 'content' in entry_dict: +            content = ',\r\n' + (entry_dict['content'] or '') +        entry = json.dumps(entry_dict) + content          self.entries.append(entry) -        self.pending_size += len(entry) -    def pop(self): +    def pop(self, amount=10, leave_open=False):          """ -        Removes all entries and returns it formatted and ready +        Removes entries and returns it formatted and ready          to be sent. -        :param number: number of entries to pop and format -        :type number: int +        :param amount: number of entries to pop and format +        :type amount: int + +        :param leave_open: flag to skip stream closing +        :type amount: bool          :return: formatted body ready to be sent          :rtype: str          """ -        entries = self.entries[:] -        self.entries = [] -        self.pending_size = 0 -        self.consumed += len(entries) -        return self.entries_to_str(entries) +        start = self.consumed == 0 +        amount = min([len(self.entries), amount]) +        entries = [self.entries.pop(0) for i in xrange(amount)] +        self.consumed += amount +        end = len(self.entries) == 0 if not leave_open else False +        return self.entries_to_str(entries, start, end)      def __str__(self): -        return self.entries_to_str(self.entries) +        return self.pop(len(self.entries))      def __len__(self):          return len(self.entries) -    def entries_to_str(self, entries=None): +    def entries_to_str(self, entries=None, start=True, end=True):          """          Format a list of entries into the body format expected          by the server. @@ -205,6 +211,10 @@ class RequestBody(object):          :return: formatted body ready to be sent          :rtype: str          """ -        data = '[\r\n' + json.dumps(self.headers) +        data = '' +        if start: +            data = '[\r\n' + json.dumps(self.headers)          data += ''.join(',\r\n' + entry for entry in entries) -        return data + '\r\n]' +        if end: +            data += '\r\n]' +        return data diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface):          "Property, True if the syncer is syncing.")      token = Attribute("The authentication Token.") -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface):          :param url: the url of the target replica to sync with          :type url: str -        :param defer_decryption: -            Whether to defer the decryption process using the intermediate -            database. If False, decryption will be done inline. -        :type defer_decryption: bool -          :return:              A deferred that will fire with the local generation before the              synchronisation was performed. diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 1eb6f31d..3fe98c64 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type  from leap.soledad.common import document  from leap.soledad.common.log import getLogger  from leap.soledad.client import events -from leap.soledad.client.crypto import encrypt_sym, decrypt_sym +from leap.soledad.client import _crypto  logger = getLogger(__name__) @@ -126,7 +126,7 @@ class SoledadSecrets(object):      instantiates Soledad.      """ -    IV_SEPARATOR = ":" +    SEPARATOR = ":"      """      A separator used for storing the encryption initial value prepended to the      ciphertext. @@ -142,7 +142,8 @@ class SoledadSecrets(object):      KDF_SALT_KEY = 'kdf_salt'      KDF_LENGTH_KEY = 'kdf_length'      KDF_SCRYPT = 'scrypt' -    CIPHER_AES256 = 'aes256' +    CIPHER_AES256 = 'aes256'  # deprecated, AES-CTR +    CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm      RECOVERY_DOC_VERSION_KEY = 'version'      RECOVERY_DOC_VERSION = 1      """ @@ -343,7 +344,7 @@ class SoledadSecrets(object):              '%s%s' %              (self._passphrase_as_string(), self._uuid)).hexdigest() -    def _export_recovery_document(self): +    def _export_recovery_document(self, cipher=None):          """          Export the storage secrets. @@ -364,6 +365,9 @@ class SoledadSecrets(object):          Note that multiple storage secrets might be stored in one recovery          document. +        :param cipher: (Optional) The ciper to use. Defaults to AES256 +        :type cipher: str +          :return: The recovery document.          :rtype: dict          """ @@ -371,7 +375,7 @@ class SoledadSecrets(object):          encrypted_secrets = {}          for secret_id in self._secrets:              encrypted_secrets[secret_id] = self._encrypt_storage_secret( -                self._secrets[secret_id]) +                self._secrets[secret_id], doc_cipher=cipher)          # create the recovery document          data = {              self.STORAGE_SECRETS_KEY: encrypted_secrets, @@ -447,6 +451,7 @@ class SoledadSecrets(object):                  except SecretsException as e:                      logger.error("failed to decrypt storage secret: %s"                                   % str(e)) +                    raise e          return secret_count, active_secret      def _get_secrets_from_shared_db(self): @@ -537,18 +542,25 @@ class SoledadSecrets(object):          )          if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key):              raise SecretsException("Wrong length of decryption key.") -        if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256: +        supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM] +        doc_cipher = encrypted_secret_dict[self.CIPHER_KEY] +        if doc_cipher not in supported_ciphers:              raise SecretsException("Unknown cipher in stored secret.")          # recover the initial value and ciphertext          iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split( -            self.IV_SEPARATOR, 1) +            self.SEPARATOR, 1)          ciphertext = binascii.a2b_base64(ciphertext) -        decrypted_secret = decrypt_sym(ciphertext, key, iv) +        try: +            decrypted_secret = _crypto.decrypt_sym( +                ciphertext, key, iv, doc_cipher) +        except Exception as e: +            logger.error(e) +            raise SecretsException("Unable to decrypt secret.")          if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret):              raise SecretsException("Wrong length of decrypted secret.")          return decrypted_secret -    def _encrypt_storage_secret(self, decrypted_secret): +    def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None):          """          Encrypt the storage secret. @@ -567,6 +579,8 @@ class SoledadSecrets(object):          :param decrypted_secret: The decrypted storage secret.          :type decrypted_secret: str +        :param cipher: (Optional) The ciper to use. Defaults to AES256 +        :type cipher: str          :return: The encrypted storage secret.          :rtype: dict @@ -575,17 +589,18 @@ class SoledadSecrets(object):          salt = os.urandom(self.SALT_LENGTH)          # get a 256-bit key          key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32) -        iv, ciphertext = encrypt_sym(decrypted_secret, key) +        doc_cipher = doc_cipher or self.CIPHER_AES256_GCM +        iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher) +        ciphertext = binascii.b2a_base64(ciphertext)          encrypted_secret_dict = {              # leap.soledad.crypto submodule uses AES256 for symmetric              # encryption.              self.KDF_KEY: self.KDF_SCRYPT,              self.KDF_SALT_KEY: binascii.b2a_base64(salt),              self.KDF_LENGTH_KEY: len(key), -            self.CIPHER_KEY: self.CIPHER_AES256, +            self.CIPHER_KEY: doc_cipher,              self.LENGTH_KEY: len(decrypted_secret), -            self.SECRET_KEY: '%s%s%s' % ( -                str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)), +            self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext])          }          return encrypted_secret_dict diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 3921c323..c9a9444e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,9 +42,7 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases  handled by Soledad should be created by SQLCipher >= 2.0.  """  import os -import json -from hashlib import sha256  from functools import partial  from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -117,7 +115,7 @@ class SQLCipherOptions(object):      @classmethod      def copy(cls, source, path=None, key=None, create=None,               is_raw_key=None, cipher=None, kdf_iter=None, -             cipher_page_size=None, defer_encryption=None, sync_db_key=None): +             cipher_page_size=None, sync_db_key=None):          """          Return a copy of C{source} with parameters different than None          replaced by new values. @@ -134,7 +132,7 @@ class SQLCipherOptions(object):                  args.append(getattr(source, name))          for name in ["create", "is_raw_key", "cipher", "kdf_iter", -                     "cipher_page_size", "defer_encryption", "sync_db_key"]: +                     "cipher_page_size", "sync_db_key"]:              val = local_vars[name]              if val is not None:                  kwargs[name] = val @@ -145,7 +143,7 @@ class SQLCipherOptions(object):      def __init__(self, path, key, create=True, is_raw_key=False,                   cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, -                 defer_encryption=False, sync_db_key=None): +                 sync_db_key=None):          """          :param path: The filesystem path for the database to open.          :type path: str @@ -163,10 +161,6 @@ class SQLCipherOptions(object):          :type kdf_iter: int          :param cipher_page_size: The page size.          :type cipher_page_size: int -        :param defer_encryption: -            Whether to defer encryption/decryption of documents, or do it -            inline while syncing. -        :type defer_encryption: bool          """          self.path = path          self.key = key @@ -175,7 +169,6 @@ class SQLCipherOptions(object):          self.cipher = cipher          self.kdf_iter = kdf_iter          self.cipher_page_size = cipher_page_size -        self.defer_encryption = defer_encryption          self.sync_db_key = sync_db_key      def __str__(self): @@ -201,7 +194,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      """      A U1DB implementation that uses SQLCipher as its persistence layer.      """ -    defer_encryption = False      # The attribute _index_storage_value will be used as the lookup key for the      # implementation of the SQLCipher storage backend. @@ -225,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          """          # ensure the db is encrypted if the file already exists          if os.path.isfile(opts.path): -            _assert_db_is_encrypted(opts) - -        # connect to the sqlcipher database -        self._db_handle = initialize_sqlcipher_db(opts) +            self._db_handle = _assert_db_is_encrypted(opts) +        else: +            # connect to the sqlcipher database +            self._db_handle = initialize_sqlcipher_db(opts)          # TODO ---------------------------------------------------          # Everything else in this initialization has to be factored @@ -267,27 +259,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')      # -    # Document operations -    # - -    def put_doc(self, doc): -        """ -        Overwrite the put_doc method, to enqueue the modified document for -        encryption before sync. - -        :param doc: The document to be put. -        :type doc: u1db.Document - -        :return: The new document revision. -        :rtype: str -        """ -        doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) -        if self.defer_encryption: -            # TODO move to api? -            self._sync_enc_pool.encrypt_doc(doc) -        return doc_rev - -    #      # SQLCipher API methods      # @@ -425,25 +396,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase):      """      ENCRYPT_LOOP_PERIOD = 1 -    def __init__(self, opts, soledad_crypto, replica_uid, cert_file, -                 defer_encryption=False, sync_db=None, sync_enc_pool=None): +    def __init__(self, opts, soledad_crypto, replica_uid, cert_file):          self._opts = opts          self._path = opts.path          self._crypto = soledad_crypto          self.__replica_uid = replica_uid          self._cert_file = cert_file -        self._sync_enc_pool = sync_enc_pool - -        self._sync_db = sync_db -        # we store syncers in a dictionary indexed by the target URL. We also -        # store a hash of the auth info in case auth info expires and we need -        # to rebuild the syncer for that target. The final self._syncers -        # format is the following: -        # -        #  self._syncers = {'<url>': ('<auth_hash>', syncer), ...} -        self._syncers = {}          # storage for the documents received during a sync          self.received_docs = [] @@ -458,6 +418,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          if DO_STATS:              self.sync_phase = None +    def commit(self): +        self._db_handle.commit() +      @property      def _replica_uid(self):          return str(self.__replica_uid) @@ -477,7 +440,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              raise DatabaseAccessError(str(e))      @defer.inlineCallbacks -    def sync(self, url, creds=None, defer_decryption=True): +    def sync(self, url, creds=None):          """          Synchronize documents with remote replica exposed at url. @@ -492,10 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :param creds: optional dictionary giving credentials to authorize the                        operation with the server.          :type creds: dict -        :param defer_decryption: -            Whether to defer the decryption process using the intermediate -            database. If False, decryption will be done inline. -        :type defer_decryption: bool          :return:              A Deferred, that will fire with the local generation (type `int`) @@ -507,8 +466,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):              self.sync_phase = syncer.sync_phase              self.syncer = syncer              self.sync_exchange_phase = syncer.sync_exchange_phase -        local_gen_before_sync = yield syncer.sync( -            defer_decryption=defer_decryption) +        local_gen_before_sync = yield syncer.sync()          self.received_docs = syncer.received_docs          defer.returnValue(local_gen_before_sync) @@ -525,29 +483,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          :return: A synchronizer.          :rtype: Synchronizer          """ -        # we want to store at most one syncer for each url, so we also store a -        # hash of the connection credentials and replace the stored syncer for -        # a certain url if credentials have changed. -        h = sha256(json.dumps([url, creds])).hexdigest() -        cur_h, syncer = self._syncers.get(url, (None, None)) -        if syncer is None or h != cur_h: -            syncer = SoledadSynchronizer( -                self, -                SoledadHTTPSyncTarget( -                    url, -                    # XXX is the replica_uid ready? -                    self._replica_uid, -                    creds=creds, -                    crypto=self._crypto, -                    cert_file=self._cert_file, -                    sync_db=self._sync_db, -                    sync_enc_pool=self._sync_enc_pool)) -            self._syncers[url] = (h, syncer) -        # in order to reuse the same synchronizer multiple times we have to -        # reset its state (i.e. the number of documents received from target -        # and inserted in the local replica). -        syncer.num_inserted = 0 -        return syncer +        return SoledadSynchronizer( +            self, +            SoledadHTTPSyncTarget( +                url, +                # XXX is the replica_uid ready? +                self._replica_uid, +                creds=creds, +                crypto=self._crypto, +                cert_file=self._cert_file))      #      # Symmetric encryption of syncing docs @@ -558,18 +502,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):          # XXX this SHOULD BE a callback          return self._get_generation() -    def close(self): -        """ -        Close the syncer and syncdb orderly -        """ -        super(SQLCipherU1DBSync, self).close() -        # close all open syncers -        for url in self._syncers.keys(): -            _, syncer = self._syncers[url] -            syncer.close() -            del self._syncers[url] -        self.running = False -  class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):      """ @@ -599,14 +531,12 @@ class SoledadSQLCipherWrapper(SQLCipherDatabase):      It can be used from adbapi to initialize a soledad database after      getting a regular connection to a sqlcipher database.      """ -    def __init__(self, conn, opts, sync_enc_pool): +    def __init__(self, conn, opts):          self._db_handle = conn          self._real_replica_uid = None          self._ensure_schema()          self.set_document_factory(soledad_doc_factory)          self._prime_replica_uid() -        self.defer_encryption = opts.defer_encryption -        self._sync_enc_pool = sync_enc_pool  def _assert_db_is_encrypted(opts): @@ -635,7 +565,7 @@ def _assert_db_is_encrypted(opts):          # assert that we can access it using SQLCipher with the given          # key          dummy_query = ('SELECT count(*) FROM sqlite_master',) -        initialize_sqlcipher_db(opts, on_init=dummy_query) +        return initialize_sqlcipher_db(opts, on_init=dummy_query)      else:          raise DatabaseIsNotEncrypted() @@ -660,6 +590,7 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,      return SoledadDocument(doc_id=doc_id, rev=rev, json=json,                             has_conflicts=has_conflicts, syncable=syncable) +  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 7ed5f693..70c841d6 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer):              self.sync_exchange_phase = None      @defer.inlineCallbacks -    def sync(self, defer_decryption=True): +    def sync(self):          """          Synchronize documents between source and target. -        Differently from u1db `Synchronizer.sync` method, this one allows to -        pass a `defer_decryption` flag that will postpone the last -        step in the synchronization dance, namely, the setting of the last -        known generation and transaction id for a given remote replica. - -        This is done to allow the ongoing parallel decryption of the incoming -        docs to proceed without `InvalidGeneration` conflicts. - -        :param defer_decryption: Whether to defer the decryption process using -                                 the intermediate database. If False, -                                 decryption will be done inline. -        :type defer_decryption: bool -          :return: A deferred which will fire after the sync has finished with                   the local generation before the synchronization was performed.          :rtype: twisted.internet.defer.Deferred @@ -154,28 +141,19 @@ class SoledadSynchronizer(Synchronizer):              self.sync_phase[0] += 1          # -------------------------------------------------------------------- -        # prepare to send all the changed docs -        changed_doc_ids = [doc_id for doc_id, _, _ in changes] -        docs_to_send = self.source.get_docs( -            changed_doc_ids, check_for_conflicts=False, include_deleted=True) -        ids_sent = [] -        docs_by_generation = [] -        idx = 0 -        for doc in docs_to_send: -            _, gen, trans = changes[idx] -            docs_by_generation.append((doc, gen, trans)) -            idx += 1 -            ids_sent.append(doc.doc_id) +        docs_by_generation = self._docs_by_gen_from_changes(changes)          # exchange documents and try to insert the returned ones with          # the target, return target synced-up-to gen.          new_gen, new_trans_id = yield sync_target.sync_exchange(              docs_by_generation, self.source._replica_uid,              target_last_known_gen, target_last_known_trans_id, -            self._insert_doc_from_target, ensure_callback=ensure_callback, -            defer_decryption=defer_decryption) +            self._insert_doc_from_target, ensure_callback=ensure_callback) +        ids_sent = [doc_id for doc_id, _, _ in changes]          logger.debug("target gen after sync: %d" % new_gen)          logger.debug("target trans_id after sync: %s" % new_trans_id) +        if hasattr(self.source, 'commit'):  # sqlcipher backend speed up +            self.source.commit()  # insert it all in a single transaction          info = {              "target_replica_uid": self.target_replica_uid,              "new_gen": new_gen, @@ -204,6 +182,14 @@ class SoledadSynchronizer(Synchronizer):          defer.returnValue(my_gen) +    def _docs_by_gen_from_changes(self, changes): +        docs_by_generation = [] +        kwargs = {'include_deleted': True} +        for doc_id, gen, trans in changes: +            get_doc = (self.source.get_doc, (doc_id,), kwargs) +            docs_by_generation.append((get_doc, gen, trans)) +        return docs_by_generation +      def complete_sync(self):          """          Last stage of the synchronization: @@ -225,12 +211,6 @@ class SoledadSynchronizer(Synchronizer):          # if gapless record current reached generation with target          return self._record_sync_info_with_the_target(info["my_gen"]) -    def close(self): -        """ -        Close the synchronizer. -        """ -        self.sync_target.close() -      def _record_sync_info_with_the_target(self, start_generation):          """          Store local replica metadata in server. diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index f4f48f86..4a29ca87 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -73,8 +73,8 @@ class SoledadBackend(CommonBackend):      def batch_end(self):          if not self.BATCH_SUPPORT:              return -        self.batching = False          self._database.batch_end() +        self.batching = False          for name in self.after_batch_callbacks:              self.after_batch_callbacks[name]()          self.after_batch_callbacks = None @@ -570,7 +570,7 @@ class SoledadBackend(CommonBackend):          self._put_doc(cur_doc, doc)      def get_docs(self, doc_ids, check_for_conflicts=True, -                 include_deleted=False): +                 include_deleted=False, read_content=True):          """          Get the JSON content for many documents. @@ -588,7 +588,7 @@ class SoledadBackend(CommonBackend):          :rtype: iterable          """          return self._database.get_docs(doc_ids, check_for_conflicts, -                                       include_deleted) +                                       include_deleted, read_content)      def _prune_conflicts(self, doc, doc_vcr):          """ diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index 0f4102db..2e6f734e 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -20,6 +20,7 @@  import json +import copy  import re  import uuid  import binascii @@ -295,31 +296,14 @@ class CouchDatabase(object):          generation, _ = self.get_generation_info()          results = list( -            self._get_docs(None, True, include_deleted)) +            self.get_docs(None, True, include_deleted))          return (generation, results)      def get_docs(self, doc_ids, check_for_conflicts=True, -                 include_deleted=False): +                 include_deleted=False, read_content=True):          """          Get the JSON content for many documents. -        :param doc_ids: A list of document identifiers or None for all. -        :type doc_ids: list -        :param check_for_conflicts: If set to False, then the conflict check -                                    will be skipped, and 'None' will be -                                    returned instead of True/False. -        :type check_for_conflicts: bool -        :param include_deleted: If set to True, deleted documents will be -                                returned with empty content. Otherwise deleted -                                documents will not be included in the results. -        :return: iterable giving the Document object for each document id -                 in matching doc_ids order. -        :rtype: iterable -        """ -        return self._get_docs(doc_ids, check_for_conflicts, include_deleted) - -    def _get_docs(self, doc_ids, check_for_conflicts, include_deleted): -        """          Use couch's `_all_docs` view to get the documents indicated in          `doc_ids`, @@ -337,14 +321,21 @@ class CouchDatabase(object):                   in matching doc_ids order.          :rtype: iterable          """ -        params = {'include_docs': 'true', 'attachments': 'true'} +        params = {'include_docs': 'true', 'attachments': 'false'}          if doc_ids is not None:              params['keys'] = doc_ids          view = self._database.view("_all_docs", **params)          for row in view.rows: -            result = row['doc'] +            result = copy.deepcopy(row['doc']) +            for file_name in result.get('_attachments', {}).keys(): +                data = self._database.get_attachment(result, file_name) +                if data: +                    if read_content: +                        data = data.read() +                    result['_attachments'][file_name] = {'data': data}              doc = self.__parse_doc_from_couch( -                result, result['_id'], check_for_conflicts=check_for_conflicts) +                result, result['_id'], +                check_for_conflicts=check_for_conflicts, decode=False)              # filter out non-u1db or deleted documents              if not doc or (not include_deleted and doc.is_tombstone()):                  continue @@ -409,7 +400,7 @@ class CouchDatabase(object):          return rev      def __parse_doc_from_couch(self, result, doc_id, -                               check_for_conflicts=False): +                               check_for_conflicts=False, decode=True):          # restrict to u1db documents          if 'u1db_rev' not in result:              return None @@ -418,19 +409,23 @@ class CouchDatabase(object):          if '_attachments' not in result \                  or 'u1db_content' not in result['_attachments']:              doc.make_tombstone() -        else: +        elif decode:              doc.content = json.loads(                  binascii.a2b_base64(                      result['_attachments']['u1db_content']['data'])) +        else: +            doc._json = result['_attachments']['u1db_content']['data']          # determine if there are conflicts          if check_for_conflicts \                  and '_attachments' in result \                  and 'u1db_conflicts' in result['_attachments']: -            doc.set_conflicts( -                self._build_conflicts( -                    doc.doc_id, -                    json.loads(binascii.a2b_base64( -                        result['_attachments']['u1db_conflicts']['data'])))) +            if decode: +                conflicts = binascii.a2b_base64( +                    result['_attachments']['u1db_conflicts']['data']) +            else: +                conflicts = result['_attachments']['u1db_conflicts']['data'] +            conflicts = json.loads(conflicts) +            doc.set_conflicts(self._build_conflicts(doc.doc_id, conflicts))          # store couch revision          doc.couch_rev = result['_rev']          return doc @@ -663,7 +658,7 @@ class CouchDatabase(object):          _, _, data = resource.get_json(**kwargs)          return data -    def _allocate_new_generation(self, doc_id, transaction_id): +    def _allocate_new_generation(self, doc_id, transaction_id, save=True):          """          Allocate a new generation number for a document modification. @@ -703,10 +698,12 @@ class CouchDatabase(object):                      DOC_ID_KEY: doc_id,                      TRANSACTION_ID_KEY: transaction_id,                  } -                self._database.save(gen_doc) +                if save: +                    self._database.save(gen_doc)                  break  # succeeded allocating a new generation, proceed              except ResourceConflict:                  pass  # try again! +        return gen_doc      def save_document(self, old_doc, doc, transaction_id):          """ @@ -785,6 +782,7 @@ class CouchDatabase(object):                      headers=envelope.headers)              except ResourceConflict:                  raise RevisionConflict() +            self._allocate_new_generation(doc.doc_id, transaction_id)          else:              for name, attachment in attachments.items():                  del attachment['follows'] @@ -793,12 +791,13 @@ class CouchDatabase(object):                  attachment['data'] = binascii.b2a_base64(                      parts[index]).strip()              couch_doc['_attachments'] = attachments +            gen_doc = self._allocate_new_generation( +                doc.doc_id, transaction_id, save=False)              self.batch_docs[doc.doc_id] = couch_doc +            self.batch_docs[gen_doc['_id']] = gen_doc              last_gen, last_trans_id = self.batch_generation              self.batch_generation = (last_gen + 1, transaction_id) -        self._allocate_new_generation(doc.doc_id, transaction_id) -      def _new_resource(self, *path):          """          Return a new resource for accessing a couch database. diff --git a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py index d73c0d16..27db65af 100644 --- a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py +++ b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py @@ -505,12 +505,11 @@ class SQLiteDatabase(CommonBackend):      def _put_doc_if_newer(self, doc, save_conflict, replica_uid=None,                            replica_gen=None, replica_trans_id=None): -        with self._db_handle: -            return super(SQLiteDatabase, self)._put_doc_if_newer( -                doc, -                save_conflict=save_conflict, -                replica_uid=replica_uid, replica_gen=replica_gen, -                replica_trans_id=replica_trans_id) +        return super(SQLiteDatabase, self)._put_doc_if_newer( +            doc, +            save_conflict=save_conflict, +            replica_uid=replica_uid, replica_gen=replica_gen, +            replica_trans_id=replica_trans_id)      def _add_conflict(self, c, doc_id, my_doc_rev, my_content):          c.execute("INSERT INTO conflicts VALUES (?, ?, ?)", @@ -924,4 +923,5 @@ class SQLitePartialExpandDatabase(SQLiteDatabase):              raw_doc = json.loads(doc)              self._update_indexes(doc_id, raw_doc, getters, c) +  SQLiteDatabase.register_implementation(SQLitePartialExpandDatabase) diff --git a/common/src/leap/soledad/common/l2db/remote/http_app.py b/common/src/leap/soledad/common/l2db/remote/http_app.py index 5cf6645e..496274b2 100644 --- a/common/src/leap/soledad/common/l2db/remote/http_app.py +++ b/common/src/leap/soledad/common/l2db/remote/http_app.py @@ -194,6 +194,7 @@ class URLToResource(object):          resource_cls = params.pop('resource_cls')          return resource_cls, params +  url_to_resource = URLToResource() @@ -501,7 +502,9 @@ class HTTPResponder(object):              self._write('\r\n')          else:              self._write(',\r\n') -        self._write(json.dumps(entry)) +        if type(entry) == dict: +            entry = json.dumps(entry) +        self._write(entry)      def end_stream(self):          "end stream (array)." diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 11d72791..48eec0f7 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -1,13 +1,13 @@  #!/usr/bin/python  import os +import sys  import argparse  import tempfile  import getpass  import requests  import srp._pysrp as srp  import binascii -import logging  import json  import time @@ -15,6 +15,7 @@ from twisted.internet import reactor  from twisted.internet.defer import inlineCallbacks  from leap.soledad.client import Soledad +from leap.soledad.common.log import getLogger  from leap.keymanager import KeyManager  from leap.keymanager.openpgp import OpenPGPKey @@ -39,9 +40,9 @@ Use the --help option to see available options.  # create a logger -logger = logging.getLogger(__name__) -LOG_FORMAT = '%(asctime)s %(message)s' -logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG) +logger = getLogger(__name__) +from twisted.python import log +log.startLogging(sys.stdout)  safe_unhexlify = lambda x: binascii.unhexlify(x) if ( @@ -133,8 +134,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file,          local_db_path=local_db_path,          server_url=server_url,          cert_file=cert_file, -        auth_token=token, -        defer_encryption=True) +        auth_token=token)  def _get_keymanager_instance(username, provider, soledad, token, diff --git a/scripts/docker/files/bin/client_side_db.py b/scripts/docker/files/bin/client_side_db.py index 4be33d13..80da7392 100644 --- a/scripts/docker/files/bin/client_side_db.py +++ b/scripts/docker/files/bin/client_side_db.py @@ -136,8 +136,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file,          local_db_path=local_db_path,          server_url=server_url,          cert_file=cert_file, -        auth_token=token, -        defer_encryption=True) +        auth_token=token)  def _get_keymanager_instance(username, provider, soledad, token, diff --git a/scripts/profiling/mail/soledad_client.py b/scripts/profiling/mail/soledad_client.py index 5ac8ce39..dcd605aa 100644 --- a/scripts/profiling/mail/soledad_client.py +++ b/scripts/profiling/mail/soledad_client.py @@ -30,8 +30,7 @@ class SoledadClient(object):              server_url=self._server_url,              cert_file=None,              auth_token=self._auth_token, -            secret_id=None, -            defer_encryption=True) +            secret_id=None)      def close(self):          if self._soledad is not None: diff --git a/scripts/profiling/sync/profile-sync.py b/scripts/profiling/sync/profile-sync.py index 34e66f03..1d59217a 100755 --- a/scripts/profiling/sync/profile-sync.py +++ b/scripts/profiling/sync/profile-sync.py @@ -91,7 +91,6 @@ def _get_soledad_instance_from_uuid(uuid, passphrase, basedir, server_url,          server_url=server_url,          cert_file=cert_file,          auth_token=token, -        defer_encryption=True,          syncable=True) diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index e92dfde6..e4a87e74 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -3,3 +3,4 @@ PyOpenSSL  twisted>=12.3.0  Beaker  couchdb +python-cjson diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server index d9dab040..753a260b 100644 --- a/server/pkg/soledad-server +++ b/server/pkg/soledad-server @@ -11,7 +11,7 @@  PATH=/sbin:/bin:/usr/sbin:/usr/bin  PIDFILE=/var/run/soledad.pid -OBJ=leap.soledad.server.application.wsgi_application +RESOURCE_CLASS=leap.soledad.server.resource.SoledadResource  HTTPS_PORT=2424  CONFDIR=/etc/soledad  CERT_PATH="${CONFDIR}/soledad-server.pem" @@ -39,7 +39,7 @@ case "${1}" in  	    --syslog \  	    --prefix=soledad-server \              web \ -            --wsgi=${OBJ} \ +            --class=${RESOURCE_CLASS} \              --port=ssl:${HTTPS_PORT}:privateKey=${PRIVKEY_PATH}:certKey=${CERT_PATH}:sslmethod=${SSL_METHOD}          echo "."      ;; diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index d8243c19..039bef75 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody(              try:                  content_length = int(self.environ['CONTENT_LENGTH'])              except (ValueError, KeyError): -                raise http_app.BadRequest +                # raise http_app.BadRequest +                content_length = self.max_request_size              if content_length <= 0:                  raise http_app.BadRequest              if content_length > self.max_request_size: @@ -219,27 +220,23 @@ class HTTPInvocationByMethodWithBody(                  if content_type == 'application/x-soledad-sync-put':                      meth_put = self._lookup('%s_put' % method)                      meth_end = self._lookup('%s_end' % method) -                    entries = []                      while True: -                        line = body_getline() -                        entry = line.strip() +                        entry = body_getline().strip()                          if entry == ']':  # end of incoming document stream                              break                          if not entry or not comma:  # empty or no prec comma                              raise http_app.BadRequest                          entry, comma = utils.check_and_strip_comma(entry) -                        entries.append(entry) +                        content = body_getline().strip() +                        content, comma = utils.check_and_strip_comma(content) +                        meth_put({'content': content or None}, entry)                      if comma or body_getline():  # extra comma or data                          raise http_app.BadRequest -                    for entry in entries: -                        meth_put({}, entry)                      return meth_end()                  # handle outgoing documents                  elif content_type == 'application/x-soledad-sync-get': -                    line = body_getline() -                    entry = line.strip()                      meth_get = self._lookup('%s_get' % method) -                    return meth_get({}, line) +                    return meth_get()                  else:                      raise http_app.BadRequest()              else: diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py index 4a791cbe..3c17ec19 100644 --- a/server/src/leap/soledad/server/config.py +++ b/server/src/leap/soledad/server/config.py @@ -24,7 +24,7 @@ CONFIG_DEFAULTS = {          'couch_url': 'http://localhost:5984',          'create_cmd': None,          'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', -        'batching': False +        'batching': True      },      'database-security': {          'members': ['soledad'], diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py new file mode 100644 index 00000000..dbb91b0a --- /dev/null +++ b/server/src/leap/soledad/server/resource.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# resource.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A twisted resource that serves the Soledad Server. +""" + +from twisted.web.resource import Resource +from twisted.web.wsgi import WSGIResource +from twisted.internet import reactor +from twisted.python import threadpool + +from leap.soledad.server.application import wsgi_application + + +__all__ = ['SoledadResource'] + + +# setup a wsgi resource with its own threadpool +pool = threadpool.ThreadPool() +reactor.callWhenRunning(pool.start) +reactor.addSystemEventTrigger('after', 'shutdown', pool.stop) +wsgi_resource = WSGIResource(reactor, pool, wsgi_application) + + +class SoledadResource(Resource): +    """ +    This is a dummy twisted resource, used only to allow different entry points +    for the Soledad Server. +    """ + +    def __init__(self): +        self.children = {'': wsgi_resource} + +    def getChild(self, path, request): +        # for now, just "rewind" the path and serve the wsgi resource for all +        # requests. In the future, we might look into the request path to +        # decide which child resources should serve each request. +        request.postpath.insert(0, request.prepath.pop()) +        return self.children[''] diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,14 +17,19 @@  """  Server side synchronization infrastructure.  """ -from leap.soledad.common.l2db import sync, Document +import time +from itertools import izip + +from leap.soledad.common.l2db import sync  from leap.soledad.common.l2db.remote import http_app  from leap.soledad.server.caching import get_cache_for  from leap.soledad.server.state import ServerSyncState +from leap.soledad.common.document import ServerDocument -MAX_REQUEST_SIZE = 200  # in Mb +MAX_REQUEST_SIZE = float('inf')  # It's a stream.  MAX_ENTRY_SIZE = 200  # in Mb +ENTRY_CACHE_SIZE = 8192 * 1024  class SyncExchange(sync.SyncExchange): @@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange):          # recover sync state          self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) -    def find_changes_to_return(self, received): +    def find_changes_to_return(self):          """          Find changes to return. @@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange):          order using whats_changed. It excludes documents ids that have          already been considered (superseded by the sender, etc). -        :param received: How many documents the source replica has already -                         received during the current sync process. -        :type received: int -          :return: the generation of this database, which the caller can                   consider themselves to be synchronized after processing                   allreturned documents, and the amount of documents to be sent @@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange):              self._trace('after whats_changed')              seen_ids = self._sync_state.seen_ids()              # changed docs that weren't superseded by or converged with -            changes_to_return = [ +            self.changes_to_return = [                  (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes                  # there was a subsequent update                  if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]              self._sync_state.put_changes_to_return( -                new_gen, new_trans_id, changes_to_return) -            number_of_changes = len(changes_to_return) -        # query server for stored changes -        _, _, next_change_to_return = \ -            self._sync_state.next_change_to_return(received) +                new_gen, new_trans_id, self.changes_to_return) +            number_of_changes = len(self.changes_to_return)          self.new_gen = new_gen          self.new_trans_id = new_trans_id -        # and append one change -        self.change_to_return = next_change_to_return          return self.new_gen, number_of_changes -    def return_one_doc(self, return_doc_cb): -        """ -        Return one changed document and its last change generation to the -        source syncing replica by invoking the callback return_doc_cb. +    def return_docs(self, return_doc_cb): +        """Return the changed documents and their last change generation +        repeatedly invoking the callback return_doc_cb. -        This is called once for each document to be transferred from target to -        source. +        The final step of a sync exchange. -        :param return_doc_cb: is a callback used to return the documents with -                              their last change generation to the target -                              replica. -        :type return_doc_cb: callable(doc, gen, trans_id) +        :param: return_doc_cb(doc, gen, trans_id): is a callback +                used to return the documents with their last change generation +                to the target replica. +        :return: None          """ -        if self.change_to_return is not None: -            changed_doc_id, gen, trans_id = self.change_to_return -            doc = self._db.get_doc(changed_doc_id, include_deleted=True) +        changes_to_return = self.changes_to_return +        # return docs, including conflicts. +        # content as a file-object (will be read when writing) +        changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] +        docs = self._db.get_docs( +            changed_doc_ids, check_for_conflicts=False, +            include_deleted=True, read_content=False) + +        docs_by_gen = izip( +            docs, (gen for _, gen, _ in changes_to_return), +            (trans_id for _, _, trans_id in changes_to_return)) +        for doc, gen, trans_id in docs_by_gen:              return_doc_cb(doc, gen, trans_id)      def batched_insert_from_source(self, entries, sync_id): +        if not entries: +            return          self._db.batch_start()          for entry in entries:              doc, gen, trans_id, number_of_docs, doc_idx = entry @@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource):              db, self.source_replica_uid, last_known_generation, sync_id)          self._sync_id = sync_id          self._staging = [] +        self._staging_size = 0      @http_app.http_method(content_as_args=True)      def post_put( @@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource):          :param doc_idx: The index of the current document.          :type doc_idx: int          """ -        doc = Document(id, rev, content) +        doc = ServerDocument(id, rev, json=content) +        self._staging_size += len(content or '')          self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) +        if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs: +            self.sync_exch.batched_insert_from_source(self._staging, +                                                      self._sync_id) +            self._staging = [] +            self._staging_size = 0 -    @http_app.http_method(received=int, content_as_args=True) -    def post_get(self, received): +    def post_get(self):          """ -        Return one syncing document to the client. - -        :param received: How many documents have already been received by the -                         client on the current sync session. -        :type received: int +        Return syncing documents to the client.          """ -          def send_doc(doc, gen, trans_id): -            entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), +            entry = dict(id=doc.doc_id, rev=doc.rev,                           gen=gen, trans_id=trans_id)              self.responder.stream_entry(entry) +            content_reader = doc.get_json() +            if content_reader: +                content = content_reader.read() +                self.responder.stream_entry(content) +                content_reader.close() +                # throttle at 5mb/s +                # FIXME: twistd cant control througput +                # we need to either use gunicorn or go async +                time.sleep(len(content) / (5.0 * 1024 * 1024)) +            else: +                self.responder.stream_entry('')          new_gen, number_of_changes = \ -            self.sync_exch.find_changes_to_return(received) +            self.sync_exch.find_changes_to_return()          self.responder.content_type = 'application/x-u1db-sync-response'          self.responder.start_response(200)          self.responder.start_stream(), @@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource):          if self.replica_uid is not None:              header['replica_uid'] = self.replica_uid          self.responder.stream_entry(header) -        self.sync_exch.return_one_doc(send_doc) +        self.sync_exch.return_docs(send_doc)          self.responder.end_stream()          self.responder.finish_response() @@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource):          Return the current generation and transaction_id after inserting one          incoming document.          """ -        self.sync_exch.batched_insert_from_source(self._staging, self._sync_id)          self.responder.content_type = 'application/x-soledad-sync-response'          self.responder.start_response(200)          self.responder.start_stream(), @@ -1,7 +1,7 @@  [pep8] -exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py -ignore = E731 +exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox +ignore = F812,E731  [flake8] -exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py -ignore = E731 +exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox +ignore = F812,E731 diff --git a/testing/pytest.ini b/testing/pytest.ini index 2d34c607..eb70b67c 100644 --- a/testing/pytest.ini +++ b/testing/pytest.ini @@ -1,3 +1,3 @@  [pytest]  testpaths = tests -norecursedirs = tests/perf +twisted = yes diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py index d53f6cda..57f8199b 100644 --- a/testing/test_soledad/util.py +++ b/testing/test_soledad/util.py @@ -15,12 +15,10 @@  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. -  """  Utilities used by multiple test suites.  """ -  import os  import random  import string @@ -45,21 +43,20 @@ from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.couch import CouchDatabase  from leap.soledad.common.couch.state import CouchServerState -from leap.soledad.common.crypto import ENC_SCHEME_KEY  from leap.soledad.client import Soledad  from leap.soledad.client import http_target  from leap.soledad.client import auth -from leap.soledad.client.crypto import decrypt_doc_dict  from leap.soledad.client.sqlcipher import SQLCipherDatabase  from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.client._crypto import is_symmetrically_encrypted  from leap.soledad.server import SoledadApp  from leap.soledad.server.auth import SoledadTokenAuthMiddleware  PASSWORD = '123456' -ADDRESS = 'leap@leap.se' +ADDRESS = 'user-1234'  def make_local_db_and_target(test): @@ -193,8 +190,7 @@ class MockedSharedDBTest(object):  def soledad_sync_target( -        test, path, source_replica_uid=uuid4().hex, -        sync_db=None, sync_enc_pool=None): +        test, path, source_replica_uid=uuid4().hex):      creds = {'token': {          'uuid': 'user-uuid',          'token': 'auth-token', @@ -204,14 +200,13 @@ def soledad_sync_target(          source_replica_uid,          creds,          test._soledad._crypto, -        None,  # cert_file -        sync_db=sync_db, -        sync_enc_pool=sync_enc_pool) +        None)  # cert_file  # redefine the base leap test class so it inherits from twisted trial's  # TestCase. This is needed so trial knows that it has to manage a reactor and  # wait for deferreds returned by tests to be fired. +  BaseLeapTest = type(      'BaseLeapTest', (unittest.TestCase,), dict(BaseLeapTest.__dict__)) @@ -221,7 +216,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):      """      Instantiates Soledad for usage in tests.      """ -    defer_sync_encryption = False      @pytest.mark.usefixtures("method_tmpdir")      def setUp(self): @@ -229,14 +223,7 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):          # repeat it here because twisted.trial does not work with          # setUpClass/tearDownClass. -        self.old_path = os.environ['PATH'] -        self.old_home = os.environ['HOME']          self.home = self.tempdir -        bin_tdir = os.path.join( -            self.tempdir, -            'bin') -        os.environ["PATH"] = bin_tdir -        os.environ["HOME"] = self.tempdir          # config info          self.db1_file = os.path.join(self.tempdir, "db1.u1db") @@ -263,10 +250,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):          self._db2.close()          self._soledad.close() -        # restore paths -        os.environ["PATH"] = self.old_path -        os.environ["HOME"] = self.old_home -          def _delete_temporary_dirs():              # XXX should not access "private" attrs              for f in [self._soledad.local_db_path, @@ -305,12 +288,12 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):                  self.tempdir, prefix, local_db_path),              server_url=server_url,  # Soledad will fail if not given an url              cert_file=cert_file, -            defer_encryption=self.defer_sync_encryption,              shared_db=MockSharedDB(),              auth_token=auth_token)          self.addCleanup(soledad.close)          return soledad +    @pytest.inlineCallbacks      def assertGetEncryptedDoc(              self, db, doc_id, doc_rev, content, has_conflicts):          """ @@ -320,13 +303,9 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest):                                       has_conflicts=has_conflicts)          doc = db.get_doc(doc_id) -        if ENC_SCHEME_KEY in doc.content: -            # XXX check for SYM_KEY too -            key = self._soledad._crypto.doc_passphrase(doc.doc_id) -            secret = self._soledad._crypto.secret -            decrypted = decrypt_doc_dict( -                doc.content, doc.doc_id, doc.rev, -                key, secret) +        if is_symmetrically_encrypted(doc.content['raw']): +            crypt = self._soledad._crypto +            decrypted = yield crypt.decrypt_doc(doc)              doc.set_json(decrypted)          self.assertEqual(exp_doc.doc_id, doc.doc_id)          self.assertEqual(exp_doc.rev, doc.rev) diff --git a/testing/tests/perf/assets/cert_default.conf b/testing/tests/benchmarks/assets/cert_default.conf index 8043cea3..8043cea3 100644 --- a/testing/tests/perf/assets/cert_default.conf +++ b/testing/tests/benchmarks/assets/cert_default.conf diff --git a/testing/tests/benchmarks/conftest.py b/testing/tests/benchmarks/conftest.py new file mode 100644 index 00000000..a9cc3464 --- /dev/null +++ b/testing/tests/benchmarks/conftest.py @@ -0,0 +1,57 @@ +import pytest +import random +import base64 + +from twisted.internet import threads, reactor + + +# we have to manually setup the events server in order to be able to signal +# events. This is usually done by the enclosing application using soledad +# client (i.e. bitmask client). +from leap.common.events import server +server.ensure_server() + + +def pytest_addoption(parser): +    parser.addoption( +        "--num-docs", type="int", default=100, +        help="the number of documents to use in performance tests") + + +@pytest.fixture() +def payload(): +    def generate(size): +        random.seed(1337)  # same seed to avoid different bench results +        payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) +        # encode as base64 to avoid ascii encode/decode errors +        return base64.b64encode(payload_bytes)[:size]  # remove b64 overhead +    return generate + + +@pytest.fixture() +def txbenchmark(benchmark): +    def blockOnThread(*args, **kwargs): +        return threads.deferToThread( +            benchmark, threads.blockingCallFromThread, +            reactor, *args, **kwargs) +    return blockOnThread + + +@pytest.fixture() +def txbenchmark_with_setup(benchmark): +    def blockOnThreadWithSetup(setup, f): +        def blocking_runner(*args, **kwargs): +            return threads.blockingCallFromThread(reactor, f, *args, **kwargs) + +        def blocking_setup(): +            args = threads.blockingCallFromThread(reactor, setup) +            try: +                return tuple(arg for arg in args), {} +            except TypeError: +                    return ((args,), {}) if args else None + +        def bench(): +            return benchmark.pedantic(blocking_runner, setup=blocking_setup, +                                      rounds=4, warmup_rounds=1) +        return threads.deferToThread(bench) +    return blockOnThreadWithSetup diff --git a/testing/tests/perf/pytest.ini b/testing/tests/benchmarks/pytest.ini index 7a0508ce..7a0508ce 100644 --- a/testing/tests/perf/pytest.ini +++ b/testing/tests/benchmarks/pytest.ini diff --git a/testing/tests/benchmarks/test_crypto.py b/testing/tests/benchmarks/test_crypto.py new file mode 100644 index 00000000..8ee9b899 --- /dev/null +++ b/testing/tests/benchmarks/test_crypto.py @@ -0,0 +1,97 @@ +""" +Benchmarks for crypto operations. +If you don't want to stress your local machine too much, you can pass the +SIZE_LIMT environment variable. + +For instance, to keep the maximum payload at 1MB: + +SIZE_LIMIT=1E6 py.test -s tests/perf/test_crypto.py +""" +import pytest +import os +import json +from uuid import uuid4 + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client import _crypto + +LIMIT = int(float(os.environ.get('SIZE_LIMIT', 50 * 1000 * 1000))) + + +def create_doc_encryption(size): +    @pytest.mark.benchmark(group="test_crypto_encrypt_doc") +    @pytest.inlineCallbacks +    def test_doc_encryption(soledad_client, txbenchmark, payload): +        crypto = soledad_client()._crypto + +        DOC_CONTENT = {'payload': payload(size)} +        doc = SoledadDocument( +            doc_id=uuid4().hex, rev='rev', +            json=json.dumps(DOC_CONTENT)) + +        yield txbenchmark(crypto.encrypt_doc, doc) +    return test_doc_encryption + + +# TODO this test is really bullshit, because it's still including +# the json serialization. + +def create_doc_decryption(size): +    @pytest.inlineCallbacks +    @pytest.mark.benchmark(group="test_crypto_decrypt_doc") +    def test_doc_decryption(soledad_client, txbenchmark, payload): +        crypto = soledad_client()._crypto + +        DOC_CONTENT = {'payload': payload(size)} +        doc = SoledadDocument( +            doc_id=uuid4().hex, rev='rev', +            json=json.dumps(DOC_CONTENT)) + +        encrypted_doc = yield crypto.encrypt_doc(doc) +        doc.set_json(encrypted_doc) + +        yield txbenchmark(crypto.decrypt_doc, doc) +    return test_doc_decryption + + +def create_raw_encryption(size): +    @pytest.mark.benchmark(group="test_crypto_raw_encrypt") +    def test_raw_encrypt(benchmark, payload): +        key = payload(32) +        benchmark(_crypto.encrypt_sym, payload(size), key) +    return test_raw_encrypt + + +def create_raw_decryption(size): +    @pytest.mark.benchmark(group="test_crypto_raw_decrypt") +    def test_raw_decrypt(benchmark, payload): +        key = payload(32) +        iv, ciphertext = _crypto.encrypt_sym(payload(size), key) +        benchmark(_crypto.decrypt_sym, ciphertext, key, iv) +    return test_raw_decrypt + + +# Create the TESTS in the global namespace, they'll be picked by the benchmark +# plugin. + +encryption_tests = [ +    ('10k', 1E4), +    ('100k', 1E5), +    ('500k', 5E5), +    ('1M', 1E6), +    ('10M', 1E7), +    ('50M', 5E7), +] + +for name, size in encryption_tests: +    if size < LIMIT: +        sz = int(size) +        globals()['test_encrypt_doc_' + name] = create_doc_encryption(sz) +        globals()['test_decrypt_doc_' + name] = create_doc_decryption(sz) + + +for name, size in encryption_tests: +    if size < LIMIT: +        sz = int(size) +        globals()['test_encrypt_raw_' + name] = create_raw_encryption(sz) +        globals()['test_decrypt_raw_' + name] = create_raw_decryption(sz) diff --git a/testing/tests/perf/test_misc.py b/testing/tests/benchmarks/test_misc.py index ead48adf..ead48adf 100644 --- a/testing/tests/perf/test_misc.py +++ b/testing/tests/benchmarks/test_misc.py diff --git a/testing/tests/perf/test_sqlcipher.py b/testing/tests/benchmarks/test_sqlcipher.py index e7a54228..39c9e3ad 100644 --- a/testing/tests/perf/test_sqlcipher.py +++ b/testing/tests/benchmarks/test_sqlcipher.py @@ -29,10 +29,10 @@ def build_test_sqlcipher_create(amount, size):      return test -test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500*1000) -test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100*1000) -test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10*1000) +test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500 * 1000) +test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100 * 1000) +test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10 * 1000)  # synchronous -test_create_20_500k = build_test_sqlcipher_create(20, 500*1000) -test_create_100_100k = build_test_sqlcipher_create(100, 100*1000) -test_create_1000_10k = build_test_sqlcipher_create(1000, 10*1000) +test_create_20_500k = build_test_sqlcipher_create(20, 500 * 1000) +test_create_100_100k = build_test_sqlcipher_create(100, 100 * 1000) +test_create_1000_10k = build_test_sqlcipher_create(1000, 10 * 1000) diff --git a/testing/tests/perf/test_sync.py b/testing/tests/benchmarks/test_sync.py index 0b48a0b9..1501d74b 100644 --- a/testing/tests/perf/test_sync.py +++ b/testing/tests/benchmarks/test_sync.py @@ -1,17 +1,14 @@  import pytest -  from twisted.internet.defer import gatherResults +@pytest.inlineCallbacks  def load_up(client, amount, payload): -    deferreds = []      # create a bunch of local documents +    deferreds = []      for i in xrange(amount): -        d = client.create_doc({'content': payload}) -        deferreds.append(d) -    d = gatherResults(deferreds) -    d.addCallback(lambda _: None) -    return d +        deferreds.append(client.create_doc({'content': payload})) +    yield gatherResults(deferreds)  def create_upload(uploads, size): @@ -27,9 +24,9 @@ def create_upload(uploads, size):      return test -test_upload_20_500k = create_upload(20, 500*1000) -test_upload_100_100k = create_upload(100, 100*1000) -test_upload_1000_10k = create_upload(1000, 10*1000) +test_upload_20_500k = create_upload(20, 500 * 1000) +test_upload_100_100k = create_upload(100, 100 * 1000) +test_upload_1000_10k = create_upload(1000, 10 * 1000)  def create_download(downloads, size): @@ -52,9 +49,9 @@ def create_download(downloads, size):      return test -test_download_20_500k = create_download(20, 500*1000) -test_download_100_100k = create_download(100, 100*1000) -test_download_1000_10k = create_download(1000, 10*1000) +test_download_20_500k = create_download(20, 500 * 1000) +test_download_100_100k = create_download(100, 100 * 1000) +test_download_1000_10k = create_download(1000, 10 * 1000)  @pytest.inlineCallbacks diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py index c25ff8ca..9b4a175f 100644 --- a/testing/tests/client/test_aux_methods.py +++ b/testing/tests/client/test_aux_methods.py @@ -21,10 +21,10 @@ import os  from twisted.internet import defer -from leap.soledad.common.errors import DatabaseAccessError  from leap.soledad.client import Soledad  from leap.soledad.client.adbapi import U1DBConnectionPool  from leap.soledad.client.secrets import PassphraseTooShort +from leap.soledad.client.secrets import SecretsException  from test_soledad.util import BaseSoledadTest @@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest):          sol.change_passphrase(u'654321')          sol.close() -        with self.assertRaises(DatabaseAccessError): +        with self.assertRaises(SecretsException):              self._soledad_instance(                  'leap@leap.se',                  passphrase=u'123', diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 77252b46..49a61438 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -17,47 +17,173 @@  """  Tests for cryptographic related stuff.  """ -import os -import hashlib  import binascii +import base64 +import hashlib +import json +import os + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag -from leap.soledad.client import crypto  from leap.soledad.common.document import SoledadDocument  from test_soledad.util import BaseSoledadTest -from leap.soledad.common.crypto import WrongMacError -from leap.soledad.common.crypto import UnknownMacMethodError -from leap.soledad.common.crypto import ENC_JSON_KEY -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.common.crypto import MAC_KEY -from leap.soledad.common.crypto import MAC_METHOD_KEY +from leap.soledad.client import _crypto + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( +    "You can't come up against " +    "the world's most powerful intelligence " +    "agencies and not accept the risk. " +    "If they want to get you, over time " +    "they will.") + + +class AESTest(unittest.TestCase): + +    def test_chunked_encryption(self): +        key = 'A' * 32 + +        fd = BytesIO() +        aes = _crypto.AESWriter(key, _buffer=fd) +        iv = aes.iv + +        data = snowden1 +        block = 16 + +        for i in range(len(data) / block): +            chunk = data[i * block:(i + 1) * block] +            aes.write(chunk) +        aes.end() + +        ciphertext_chunked = fd.getvalue() +        ciphertext, tag = _aes_encrypt(key, iv, data) + +        assert ciphertext_chunked == ciphertext + +    def test_decrypt(self): +        key = 'A' * 32 +        iv = 'A' * 16 + +        data = snowden1 +        block = 16 + +        ciphertext, tag = _aes_encrypt(key, iv, data) + +        fd = BytesIO() +        aes = _crypto.AESWriter(key, iv, fd, tag=tag) + +        for i in range(len(ciphertext) / block): +            chunk = ciphertext[i * block:(i + 1) * block] +            aes.write(chunk) +        aes.end() + +        cleartext_chunked = fd.getvalue() +        assert cleartext_chunked == data + + +class BlobTestCase(unittest.TestCase): + +    class doc_info: +        doc_id = 'D-deadbeef' +        rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + +    @defer.inlineCallbacks +    def test_blob_encryptor(self): + +        inf = BytesIO(snowden1) + +        blob = _crypto.BlobEncryptor( +            self.doc_info, inf, +            secret='A' * 96) +        encrypted = yield blob.encrypt() +        preamble, ciphertext = _crypto._split(encrypted.getvalue()) +        ciphertext = ciphertext[:-16] -class EncryptedSyncTestCase(BaseSoledadTest): +        assert len(preamble) == _crypto.PACMAN.size +        unpacked_data = _crypto.PACMAN.unpack(preamble) +        magic, sch, meth, ts, iv, doc_id, rev = unpacked_data +        assert magic == _crypto.BLOB_SIGNATURE_MAGIC +        assert sch == 1 +        assert meth == _crypto.ENC_METHOD.aes_256_gcm +        assert iv == blob.iv +        assert doc_id == 'D-deadbeef' +        assert rev == self.doc_info.rev -    """ -    Tests that guarantee that data will always be encrypted when syncing. -    """ +        aes_key = _crypto._get_sym_key_for_doc( +            self.doc_info.doc_id, 'A' * 96) +        assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0] -    def test_encrypt_decrypt_json(self): +        decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext, +                                 preamble) +        assert str(decrypted) == snowden1 + +    @defer.inlineCallbacks +    def test_blob_decryptor(self): + +        inf = BytesIO(snowden1) + +        blob = _crypto.BlobEncryptor( +            self.doc_info, inf, +            secret='A' * 96) +        ciphertext = yield blob.encrypt() + +        decryptor = _crypto.BlobDecryptor( +            self.doc_info, ciphertext, +            secret='A' * 96) +        decrypted = yield decryptor.decrypt() +        assert decrypted == snowden1 + +    @defer.inlineCallbacks +    def test_encrypt_and_decrypt(self):          """ -        Test encrypting and decrypting documents. +        Check that encrypting and decrypting gives same doc.          """ -        simpledoc = {'key': 'val'} -        doc1 = SoledadDocument(doc_id='id') -        doc1.content = simpledoc - -        # encrypt doc -        doc1.set_json(self._soledad._crypto.encrypt_doc(doc1)) -        # assert content is different and includes keys -        self.assertNotEqual( -            simpledoc, doc1.content, -            'incorrect document encryption') -        self.assertTrue(ENC_JSON_KEY in doc1.content) -        self.assertTrue(ENC_SCHEME_KEY in doc1.content) -        # decrypt doc -        doc1.set_json(self._soledad._crypto.decrypt_doc(doc1)) -        self.assertEqual( -            simpledoc, doc1.content, 'incorrect document encryption') +        crypto = _crypto.SoledadCrypto('A' * 96) +        payload = {'key': 'someval'} +        doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + +        encrypted = yield crypto.encrypt_doc(doc1) +        assert encrypted != payload +        assert 'raw' in encrypted +        doc2 = SoledadDocument('id1', '1') +        doc2.set_json(encrypted) +        assert _crypto.is_symmetrically_encrypted(encrypted) +        decrypted = yield crypto.decrypt_doc(doc2) +        assert len(decrypted) != 0 +        assert json.loads(decrypted) == payload + +    @defer.inlineCallbacks +    def test_decrypt_with_wrong_tag_raises(self): +        """ +        Trying to decrypt a document with wrong MAC should raise. +        """ +        crypto = _crypto.SoledadCrypto('A' * 96) +        payload = {'key': 'someval'} +        doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + +        encrypted = yield crypto.encrypt_doc(doc1) +        encdict = json.loads(encrypted) +        preamble, raw = _crypto._split(str(encdict['raw'])) +        # mess with tag +        messed = raw[:-16] + '0' * 16 + +        preamble = base64.urlsafe_b64encode(preamble) +        newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) +        doc2 = SoledadDocument('id1', '1') +        doc2.set_json(json.dumps({"raw": str(newraw)})) + +        with pytest.raises(_crypto.InvalidBlob): +            yield crypto.decrypt_doc(doc2)  class RecoveryDocumentTestCase(BaseSoledadTest): @@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):          encrypted_secret = rd[              self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id]          self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) -        self.assertTrue( -            encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256') +        self.assertEquals( +            _crypto.ENC_METHOD.aes_256_gcm, +            encrypted_secret[self._soledad.secrets.CIPHER_KEY])          self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret)          self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) -    def test_import_recovery_document(self): -        rd = self._soledad.secrets._export_recovery_document() +    def test_import_recovery_document(self, cipher='aes256'): +        rd = self._soledad.secrets._export_recovery_document(cipher)          s = self._soledad_instance()          s.secrets._import_recovery_document(rd)          s.secrets.set_secret_id(self._soledad.secrets._secret_id) @@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest):                           'Failed settinng secret for symmetric encryption.')          s.close() +    def test_import_GCM_recovery_document(self): +        cipher = self._soledad.secrets.CIPHER_AES256_GCM +        self.test_import_recovery_document(cipher) + +    def test_import_legacy_CTR_recovery_document(self): +        cipher = self._soledad.secrets.CIPHER_AES256 +        self.test_import_recovery_document(cipher) +  class SoledadSecretsTestCase(BaseSoledadTest): @@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest):              "Should have a secret at this point") -class MacAuthTestCase(BaseSoledadTest): - -    def test_decrypt_with_wrong_mac_raises(self): -        """ -        Trying to decrypt a document with wrong MAC should raise. -        """ -        simpledoc = {'key': 'val'} -        doc = SoledadDocument(doc_id='id') -        doc.content = simpledoc -        # encrypt doc -        doc.set_json(self._soledad._crypto.encrypt_doc(doc)) -        self.assertTrue(MAC_KEY in doc.content) -        self.assertTrue(MAC_METHOD_KEY in doc.content) -        # mess with MAC -        doc.content[MAC_KEY] = '1234567890ABCDEF' -        # try to decrypt doc -        self.assertRaises( -            WrongMacError, -            self._soledad._crypto.decrypt_doc, doc) - -    def test_decrypt_with_unknown_mac_method_raises(self): -        """ -        Trying to decrypt a document with unknown MAC method should raise. -        """ -        simpledoc = {'key': 'val'} -        doc = SoledadDocument(doc_id='id') -        doc.content = simpledoc -        # encrypt doc -        doc.set_json(self._soledad._crypto.encrypt_doc(doc)) -        self.assertTrue(MAC_KEY in doc.content) -        self.assertTrue(MAC_METHOD_KEY in doc.content) -        # mess with MAC method -        doc.content[MAC_METHOD_KEY] = 'mymac' -        # try to decrypt doc -        self.assertRaises( -            UnknownMacMethodError, -            self._soledad._crypto.decrypt_doc, doc) - -  class SoledadCryptoAESTestCase(BaseSoledadTest):      def test_encrypt_decrypt_sym(self):          # generate 256-bit key          key = os.urandom(32) -        iv, cyphertext = crypto.encrypt_sym('data', key) +        iv, cyphertext = _crypto.encrypt_sym('data', key)          self.assertTrue(cyphertext is not None)          self.assertTrue(cyphertext != '')          self.assertTrue(cyphertext != 'data') -        plaintext = crypto.decrypt_sym(cyphertext, key, iv) +        plaintext = _crypto.decrypt_sym(cyphertext, key, iv)          self.assertEqual('data', plaintext) -    def test_decrypt_with_wrong_iv_fails(self): +    def test_decrypt_with_wrong_iv_raises(self):          key = os.urandom(32) -        iv, cyphertext = crypto.encrypt_sym('data', key) +        iv, cyphertext = _crypto.encrypt_sym('data', key)          self.assertTrue(cyphertext is not None)          self.assertTrue(cyphertext != '')          self.assertTrue(cyphertext != 'data') @@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):          wrongiv = rawiv          while wrongiv == rawiv:              wrongiv = os.urandom(1) + rawiv[1:] -        plaintext = crypto.decrypt_sym( -            cyphertext, key, iv=binascii.b2a_base64(wrongiv)) -        self.assertNotEqual('data', plaintext) +        with pytest.raises(InvalidTag): +            _crypto.decrypt_sym( +                cyphertext, key, iv=binascii.b2a_base64(wrongiv)) -    def test_decrypt_with_wrong_key_fails(self): +    def test_decrypt_with_wrong_key_raises(self):          key = os.urandom(32) -        iv, cyphertext = crypto.encrypt_sym('data', key) +        iv, cyphertext = _crypto.encrypt_sym('data', key)          self.assertTrue(cyphertext is not None)          self.assertTrue(cyphertext != '')          self.assertTrue(cyphertext != 'data') @@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest):          # ensure keys are different in case we are extremely lucky          while wrongkey == key:              wrongkey = os.urandom(32) -        plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv) -        self.assertNotEqual('data', plaintext) +        with pytest.raises(InvalidTag): +            _crypto.decrypt_sym(cyphertext, wrongkey, iv) + + +def _aes_encrypt(key, iv, data): +    backend = default_backend() +    cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) +    encryptor = cipher.encryptor() +    return encryptor.update(data) + encryptor.finalize(), encryptor.tag + + +def _aes_decrypt(key, iv, tag, data, aead=''): +    backend = default_backend() +    cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) +    decryptor = cipher.decryptor() +    if aead: +        decryptor.authenticate_additional_data(aead) +    return decryptor.update(data) + decryptor.finalize() diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py new file mode 100644 index 00000000..8ee3735c --- /dev/null +++ b/testing/tests/client/test_deprecated_crypto.py @@ -0,0 +1,91 @@ +import json +from twisted.internet import defer +from uuid import uuid4 +from urlparse import urljoin + +from leap.soledad.client import crypto as old_crypto +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common import crypto as common_crypto + +from test_soledad.u1db_tests import simple_doc +from test_soledad.util import SoledadWithCouchServerMixin +from test_soledad.util import make_token_soledad_app +from test_soledad.u1db_tests import TestCaseWithServer + + +def deprecate_client_crypto(client): +    secret = client._crypto.secret +    _crypto = old_crypto.SoledadCrypto(secret) +    setattr(client._dbsyncer, '_crypto', _crypto) +    return client + + +def couch_database(couch_url, uuid): +    db = CouchDatabase(couch_url, "user-%s" % (uuid,)) +    return db + + +class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): + +    def setUp(self): +        SoledadWithCouchServerMixin.setUp(self) +        TestCaseWithServer.setUp(self) + +    def tearDown(self): +        SoledadWithCouchServerMixin.tearDown(self) +        TestCaseWithServer.tearDown(self) + +    @staticmethod +    def make_app_with_state(state): +        return make_token_soledad_app(state) + +    @defer.inlineCallbacks +    def test_touch_updates_remote_representation(self): +        self.startTwistedServer() +        user = 'user-' + uuid4().hex +        server_url = 'http://%s:%d' % (self.server_address) +        client = self._soledad_instance(user=user, server_url=server_url) +        deprecated_client = deprecate_client_crypto( +            self._soledad_instance(user=user, server_url=server_url)) + +        self.make_app() +        remote = self.request_state._create_database(replica_uid=client._uuid) +        remote = CouchDatabase.open_database( +            urljoin(self.couch_url, 'user-' + user), +            create=True) + +        # ensure remote db is empty +        gen, docs = remote.get_all_docs() +        assert gen == 0 +        assert len(docs) == 0 + +        # create a doc with deprecated client and sync +        yield deprecated_client.create_doc(json.loads(simple_doc)) +        yield deprecated_client.sync() + +        # check for doc in remote db +        gen, docs = remote.get_all_docs() +        assert gen == 1 +        assert len(docs) == 1 +        doc = docs.pop() +        content = doc.content +        assert common_crypto.ENC_JSON_KEY in content +        assert common_crypto.ENC_SCHEME_KEY in content +        assert common_crypto.ENC_METHOD_KEY in content +        assert common_crypto.ENC_IV_KEY in content +        assert common_crypto.MAC_KEY in content +        assert common_crypto.MAC_METHOD_KEY in content + +        # "touch" the document with a newer client and synx +        _, docs = yield client.get_all_docs() +        yield client.put_doc(doc) +        yield client.sync() + +        # check for newer representation of doc in remote db +        gen, docs = remote.get_all_docs() +        assert gen == 2 +        assert len(docs) == 1 +        doc = docs.pop() +        content = doc.content +        assert len(content) == 1 +        assert 'raw' in content diff --git a/testing/tests/conftest.py b/testing/tests/conftest.py index 9e4319ac..1ff1cbb7 100644 --- a/testing/tests/conftest.py +++ b/testing/tests/conftest.py @@ -1,4 +1,29 @@ +import json +import os  import pytest +import requests +import signal +import time + +from hashlib import sha512 +from subprocess import call +from urlparse import urljoin +from uuid import uuid4 + +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.client import Soledad + + +# +# default options for all tests +# + +DEFAULT_PASSPHRASE = '123' + +DEFAULT_URL = 'http://127.0.0.1:2424' +DEFAULT_PRIVKEY = 'soledad_privkey.pem' +DEFAULT_CERTKEY = 'soledad_certkey.pem' +DEFAULT_TOKEN = 'an-auth-token'  def pytest_addoption(parser): @@ -16,3 +41,167 @@ def couch_url(request):  @pytest.fixture  def method_tmpdir(request, tmpdir):      request.instance.tempdir = tmpdir.strpath + + +# +# remote_db fixture: provides an empty database for a given user in a per +# function scope. +# + +class UserDatabase(object): + +    def __init__(self, url, uuid): +        self._remote_db_url = urljoin(url, 'user-%s' % uuid) + +    def setup(self): +        return CouchDatabase.open_database( +            url=self._remote_db_url, create=True, replica_uid=None) + +    def teardown(self): +        requests.delete(self._remote_db_url) + + +@pytest.fixture() +def remote_db(request): +    couch_url = request.config.option.couch_url + +    def create(uuid): +        db = UserDatabase(couch_url, uuid) +        request.addfinalizer(db.teardown) +        return db.setup() +    return create + + +def get_pid(pidfile): +    if not os.path.isfile(pidfile): +        return 0 +    try: +        with open(pidfile) as f: +            return int(f.read()) +    except IOError: +        return 0 + + +# +# soledad_server fixture: provides a running soledad server in a per module +# context (same soledad server for all tests in this module). +# + +class SoledadServer(object): + +    def __init__(self, tmpdir_factory, couch_url): +        tmpdir = tmpdir_factory.mktemp('soledad-server') +        self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid') +        self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log') +        self._couch_url = couch_url + +    def start(self): +        self._create_conf_file() +        # start the server +        call([ +            'twistd', +            '--logfile=%s' % self._logfile, +            '--pidfile=%s' % self._pidfile, +            'web', +            '--wsgi=leap.soledad.server.application.wsgi_application', +            '--port=2424' +        ]) + +    def _create_conf_file(self): +        if not os.access('/etc', os.W_OK): +            return +        if not os.path.isdir('/etc/soledad'): +            os.mkdir('/etc/soledad') +        with open('/etc/soledad/soledad-server.conf', 'w') as f: +            content = '[soledad-server]\ncouch_url = %s' % self._couch_url +            f.write(content) + +    def stop(self): +        pid = get_pid(self._pidfile) +        os.kill(pid, signal.SIGKILL) + + +@pytest.fixture(scope='module') +def soledad_server(tmpdir_factory, request): +    couch_url = request.config.option.couch_url +    server = SoledadServer(tmpdir_factory, couch_url) +    server.start() +    request.addfinalizer(server.stop) +    return server + + +# +# soledad_dbs fixture: provides all databases needed by soledad server in a per +# module scope (same databases for all tests in this module). +# + +def _token_dbname(): +    dbname = 'tokens_' + \ +        str(int(time.time() / (30 * 24 * 3600))) +    return dbname + + +class SoledadDatabases(object): + +    def __init__(self, url): +        self._token_db_url = urljoin(url, _token_dbname()) +        self._shared_db_url = urljoin(url, 'shared') + +    def setup(self, uuid): +        self._create_dbs() +        self._add_token(uuid) + +    def _create_dbs(self): +        requests.put(self._token_db_url) +        requests.put(self._shared_db_url) + +    def _add_token(self, uuid): +        token = sha512(DEFAULT_TOKEN).hexdigest() +        content = {'type': 'Token', 'user_id': uuid} +        requests.put( +            self._token_db_url + '/' + token, data=json.dumps(content)) + +    def teardown(self): +        requests.delete(self._token_db_url) +        requests.delete(self._shared_db_url) + + +@pytest.fixture() +def soledad_dbs(request): +    couch_url = request.config.option.couch_url + +    def create(uuid): +        db = SoledadDatabases(couch_url) +        request.addfinalizer(db.teardown) +        return db.setup(uuid) +    return create + + +# +# soledad_client fixture: provides a clean soledad client for a test function. +# + +@pytest.fixture() +def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request): +    passphrase = DEFAULT_PASSPHRASE +    server_url = DEFAULT_URL +    token = DEFAULT_TOKEN +    default_uuid = uuid4().hex +    remote_db(default_uuid) +    soledad_dbs(default_uuid) + +    # get a soledad instance +    def create(): +        secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % default_uuid) +        local_db_path = os.path.join(tmpdir.strpath, '%s.db' % default_uuid) +        soledad_client = Soledad( +            default_uuid, +            unicode(passphrase), +            secrets_path=secrets_path, +            local_db_path=local_db_path, +            server_url=server_url, +            cert_file=None, +            auth_token=token) +        request.addfinalizer(soledad_client.close) +        return soledad_client +    return create diff --git a/testing/tests/couch/conftest.py b/testing/tests/couch/conftest.py deleted file mode 100644 index 1074f091..00000000 --- a/testing/tests/couch/conftest.py +++ /dev/null @@ -1,31 +0,0 @@ -import couchdb -import pytest -import random -import string - - -@pytest.fixture -def random_name(): -    return 'user-' + ''.join( -        random.choice( -            string.ascii_lowercase) for _ in range(10)) - - -class RandomDatabase(object): - -    def __init__(self, couch_url, name): -        self.couch_url = couch_url -        self.name = name -        self.server = couchdb.client.Server(couch_url) -        self.database = self.server.create(name) - -    def teardown(self): -        self.server.delete(self.name) - - -@pytest.fixture -def db(random_name, request): -    couch_url = request.config.getoption('--couch-url') -    db = RandomDatabase(couch_url, random_name) -    request.addfinalizer(db.teardown) -    return db diff --git a/testing/tests/couch/test_command.py b/testing/tests/couch/test_command.py index 68097fb1..9fb2c153 100644 --- a/testing/tests/couch/test_command.py +++ b/testing/tests/couch/test_command.py @@ -25,6 +25,7 @@ class CommandBasedDBCreationTest(unittest.TestCase):                            state.ensure_database, "user-1337")      def test_raises_unauthorized_by_default(self): -        state = couch_state.CouchServerState("url", check_schema_versions=False) +        state = couch_state.CouchServerState("url", +                                             check_schema_versions=False)          self.assertRaises(u1db_errors.Unauthorized,                            state.ensure_database, "user-1337") diff --git a/testing/tests/couch/test_state.py b/testing/tests/couch/test_state.py index e293b5b8..e5ac3704 100644 --- a/testing/tests/couch/test_state.py +++ b/testing/tests/couch/test_state.py @@ -1,25 +1,32 @@  import pytest -  from leap.soledad.common.couch import CONFIG_DOC_ID  from leap.soledad.common.couch import SCHEMA_VERSION  from leap.soledad.common.couch import SCHEMA_VERSION_KEY  from leap.soledad.common.couch.state import CouchServerState +from uuid import uuid4  from leap.soledad.common.errors import WrongCouchSchemaVersionError  from leap.soledad.common.errors import MissingCouchConfigDocumentError +from test_soledad.util import CouchDBTestCase + +class CouchDesignDocsTests(CouchDBTestCase): -def test_wrong_couch_version_raises(db): -    wrong_schema_version = SCHEMA_VERSION + 1 -    db.database.create( -        {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version}) -    with pytest.raises(WrongCouchSchemaVersionError): -        CouchServerState(db.couch_url, create_cmd='/bin/echo', -                         check_schema_versions=True) +    def setUp(self): +        CouchDBTestCase.setUp(self) +        self.db = self.couch_server.create('user-' + uuid4().hex) +        self.addCleanup(self.delete_db, self.db.name) +    def test_wrong_couch_version_raises(self): +        wrong_schema_version = SCHEMA_VERSION + 1 +        self.db.create( +            {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version}) +        with pytest.raises(WrongCouchSchemaVersionError): +            CouchServerState(self.couch_url, create_cmd='/bin/echo', +                             check_schema_versions=True) -def test_missing_config_doc_raises(db): -    db.database.create({}) -    with pytest.raises(MissingCouchConfigDocumentError): -        CouchServerState(db.couch_url, create_cmd='/bin/echo', -                         check_schema_versions=True) +    def test_missing_config_doc_raises(self): +        self.db.create({}) +        with pytest.raises(MissingCouchConfigDocumentError): +            CouchServerState(self.couch_url, create_cmd='/bin/echo', +                             check_schema_versions=True) diff --git a/testing/tests/perf/conftest.py b/testing/tests/perf/conftest.py deleted file mode 100644 index 6fa6b2c0..00000000 --- a/testing/tests/perf/conftest.py +++ /dev/null @@ -1,249 +0,0 @@ -import json -import os -import pytest -import requests -import random -import base64 -import signal -import time - -from hashlib import sha512 -from uuid import uuid4 -from subprocess import call -from urlparse import urljoin -from twisted.internet import threads, reactor - -from leap.soledad.client import Soledad -from leap.soledad.common.couch import CouchDatabase - - -# we have to manually setup the events server in order to be able to signal -# events. This is usually done by the enclosing application using soledad -# client (i.e. bitmask client). -from leap.common.events import server -server.ensure_server() - - -def pytest_addoption(parser): -    parser.addoption( -        "--couch-url", type="string", default="http://127.0.0.1:5984", -        help="the url for the couch server to be used during tests") -    parser.addoption( -        "--num-docs", type="int", default=100, -        help="the number of documents to use in performance tests") - - -# -# default options for all tests -# - -DEFAULT_PASSPHRASE = '123' - -DEFAULT_URL = 'http://127.0.0.1:2424' -DEFAULT_PRIVKEY = 'soledad_privkey.pem' -DEFAULT_CERTKEY = 'soledad_certkey.pem' -DEFAULT_TOKEN = 'an-auth-token' - - -@pytest.fixture() -def payload(): -    def generate(size): -        random.seed(1337)  # same seed to avoid different bench results -        payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) -        # encode as base64 to avoid ascii encode/decode errors -        return base64.b64encode(payload_bytes)[:size]  # remove b64 overhead -    return generate - - -# -# soledad_dbs fixture: provides all databases needed by soledad server in a per -# module scope (same databases for all tests in this module). -# - -def _token_dbname(): -    dbname = 'tokens_' + \ -        str(int(time.time() / (30 * 24 * 3600))) -    return dbname - - -class SoledadDatabases(object): - -    def __init__(self, url): -        self._token_db_url = urljoin(url, _token_dbname()) -        self._shared_db_url = urljoin(url, 'shared') - -    def setup(self, uuid): -        self._create_dbs() -        self._add_token(uuid) - -    def _create_dbs(self): -        requests.put(self._token_db_url) -        requests.put(self._shared_db_url) - -    def _add_token(self, uuid): -        token = sha512(DEFAULT_TOKEN).hexdigest() -        content = {'type': 'Token', 'user_id': uuid} -        requests.put( -            self._token_db_url + '/' + token, data=json.dumps(content)) - -    def teardown(self): -        requests.delete(self._token_db_url) -        requests.delete(self._shared_db_url) - - -@pytest.fixture() -def soledad_dbs(request): -    couch_url = request.config.option.couch_url - -    def create(uuid): -        db = SoledadDatabases(couch_url) -        request.addfinalizer(db.teardown) -        return db.setup(uuid) -    return create - - -# -# remote_db fixture: provides an empty database for a given user in a per -# function scope. -# - -class UserDatabase(object): - -    def __init__(self, url, uuid): -        self._remote_db_url = urljoin(url, 'user-%s' % uuid) - -    def setup(self): -        return CouchDatabase.open_database( -            url=self._remote_db_url, create=True, replica_uid=None) - -    def teardown(self): -        requests.delete(self._remote_db_url) - - -@pytest.fixture() -def remote_db(request): -    couch_url = request.config.option.couch_url - -    def create(uuid): -        db = UserDatabase(couch_url, uuid) -        request.addfinalizer(db.teardown) -        return db.setup() -    return create - - -def get_pid(pidfile): -    if not os.path.isfile(pidfile): -        return 0 -    try: -        with open(pidfile) as f: -            return int(f.read()) -    except IOError: -        return 0 - - -# -# soledad_server fixture: provides a running soledad server in a per module -# context (same soledad server for all tests in this module). -# - -class SoledadServer(object): - -    def __init__(self, tmpdir_factory, couch_url): -        tmpdir = tmpdir_factory.mktemp('soledad-server') -        self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid') -        self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log') -        self._couch_url = couch_url - -    def start(self): -        self._create_conf_file() -        # start the server -        call([ -            'twistd', -            '--logfile=%s' % self._logfile, -            '--pidfile=%s' % self._pidfile, -            'web', -            '--wsgi=leap.soledad.server.application.wsgi_application', -            '--port=2424' -        ]) - -    def _create_conf_file(self): -        if not os.access('/etc', os.W_OK): -            return -        if not os.path.isdir('/etc/soledad'): -            os.mkdir('/etc/soledad') -        with open('/etc/soledad/soledad-server.conf', 'w') as f: -            content = '[soledad-server]\ncouch_url = %s' % self._couch_url -            f.write(content) - -    def stop(self): -        pid = get_pid(self._pidfile) -        os.kill(pid, signal.SIGKILL) - - -@pytest.fixture(scope='module') -def soledad_server(tmpdir_factory, request): -    couch_url = request.config.option.couch_url -    server = SoledadServer(tmpdir_factory, couch_url) -    server.start() -    request.addfinalizer(server.stop) -    return server - - -@pytest.fixture() -def txbenchmark(benchmark): -    def blockOnThread(*args, **kwargs): -        return threads.deferToThread( -            benchmark, threads.blockingCallFromThread, -            reactor, *args, **kwargs) -    return blockOnThread - - -@pytest.fixture() -def txbenchmark_with_setup(benchmark): -    def blockOnThreadWithSetup(setup, f): -        def blocking_runner(*args, **kwargs): -            return threads.blockingCallFromThread(reactor, f, *args, **kwargs) - -        def blocking_setup(): -            args = threads.blockingCallFromThread(reactor, setup) -            try: -                return tuple(arg for arg in args), {} -            except TypeError: -                    return ((args,), {}) if args else None - -        def bench(): -            return benchmark.pedantic(blocking_runner, setup=blocking_setup, -                                      rounds=4, warmup_rounds=1) -        return threads.deferToThread(bench) -    return blockOnThreadWithSetup - - -# -# soledad_client fixture: provides a clean soledad client for a test function. -# - -@pytest.fixture() -def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request): -    passphrase = DEFAULT_PASSPHRASE -    server_url = DEFAULT_URL -    token = DEFAULT_TOKEN -    default_uuid = uuid4().hex -    remote_db(default_uuid) -    soledad_dbs(default_uuid) - -    # get a soledad instance -    def create(): -        secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % uuid4().hex) -        local_db_path = os.path.join(tmpdir.strpath, '%s.db' % uuid4().hex) -        soledad_client = Soledad( -            default_uuid, -            unicode(passphrase), -            secrets_path=secrets_path, -            local_db_path=local_db_path, -            server_url=server_url, -            cert_file=None, -            auth_token=token, -            defer_encryption=True) -        request.addfinalizer(soledad_client.close) -        return soledad_client -    return create diff --git a/testing/tests/perf/test_crypto.py b/testing/tests/perf/test_crypto.py deleted file mode 100644 index be00560b..00000000 --- a/testing/tests/perf/test_crypto.py +++ /dev/null @@ -1,81 +0,0 @@ -import pytest -import json -from uuid import uuid4 -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.crypto import encrypt_sym -from leap.soledad.client.crypto import decrypt_sym - - -def create_doc_encryption(size): -    @pytest.mark.benchmark(group="test_crypto_encrypt_doc") -    def test_doc_encryption(soledad_client, benchmark, payload): -        crypto = soledad_client()._crypto - -        DOC_CONTENT = {'payload': payload(size)} -        doc = SoledadDocument( -            doc_id=uuid4().hex, rev='rev', -            json=json.dumps(DOC_CONTENT)) - -        benchmark(crypto.encrypt_doc, doc) -    return test_doc_encryption - - -def create_doc_decryption(size): -    @pytest.mark.benchmark(group="test_crypto_decrypt_doc") -    def test_doc_decryption(soledad_client, benchmark, payload): -        crypto = soledad_client()._crypto - -        DOC_CONTENT = {'payload': payload(size)} -        doc = SoledadDocument( -            doc_id=uuid4().hex, rev='rev', -            json=json.dumps(DOC_CONTENT)) -        encrypted_doc = crypto.encrypt_doc(doc) -        doc.set_json(encrypted_doc) - -        benchmark(crypto.decrypt_doc, doc) -    return test_doc_decryption - - -test_encrypt_doc_10k = create_doc_encryption(10*1000) -test_encrypt_doc_100k = create_doc_encryption(100*1000) -test_encrypt_doc_500k = create_doc_encryption(500*1000) -test_encrypt_doc_1M = create_doc_encryption(1000*1000) -test_encrypt_doc_10M = create_doc_encryption(10*1000*1000) -test_encrypt_doc_50M = create_doc_encryption(50*1000*1000) -test_decrypt_doc_10k = create_doc_decryption(10*1000) -test_decrypt_doc_100k = create_doc_decryption(100*1000) -test_decrypt_doc_500k = create_doc_decryption(500*1000) -test_decrypt_doc_1M = create_doc_decryption(1000*1000) -test_decrypt_doc_10M = create_doc_decryption(10*1000*1000) -test_decrypt_doc_50M = create_doc_decryption(50*1000*1000) - - -def create_raw_encryption(size): -    @pytest.mark.benchmark(group="test_crypto_raw_encrypt") -    def test_raw_encrypt(benchmark, payload): -        key = payload(32) -        benchmark(encrypt_sym, payload(size), key) -    return test_raw_encrypt - - -def create_raw_decryption(size): -    @pytest.mark.benchmark(group="test_crypto_raw_decrypt") -    def test_raw_decrypt(benchmark, payload): -        key = payload(32) -        iv, ciphertext = encrypt_sym(payload(size), key) -        benchmark(decrypt_sym, ciphertext, key, iv) -    return test_raw_decrypt - - -test_encrypt_raw_10k = create_raw_encryption(10*1000) -test_encrypt_raw_100k = create_raw_encryption(100*1000) -test_encrypt_raw_500k = create_raw_encryption(500*1000) -test_encrypt_raw_1M = create_raw_encryption(1000*1000) -test_encrypt_raw_10M = create_raw_encryption(10*1000*1000) -test_encrypt_raw_50M = create_raw_encryption(50*1000*1000) -test_decrypt_raw_10k = create_raw_decryption(10*1000) -test_decrypt_raw_100k = create_raw_decryption(100*1000) -test_decrypt_raw_500k = create_raw_decryption(500*1000) -test_decrypt_raw_1M = create_raw_decryption(1000*1000) -test_decrypt_raw_10M = create_raw_decryption(10*1000*1000) -test_decrypt_raw_50M = create_raw_decryption(50*1000*1000) diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py deleted file mode 100644 index 77091a41..00000000 --- a/testing/tests/perf/test_encdecpool.py +++ /dev/null @@ -1,78 +0,0 @@ -import pytest -import json -from uuid import uuid4 -from twisted.internet.defer import gatherResults -from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool -from leap.soledad.common.document import SoledadDocument -# FIXME: test load is low due issue #7370, higher values will get out of memory - - -def create_encrypt(amount, size): -    @pytest.mark.benchmark(group="test_pool_encrypt") -    @pytest.inlineCallbacks -    def test(soledad_client, txbenchmark_with_setup, request, payload): -        DOC_CONTENT = {'payload': payload(size)} - -        def setup(): -            client = soledad_client() -            pool = SyncEncrypterPool(client._crypto, client._sync_db) -            pool.start() -            request.addfinalizer(pool.stop) -            docs = [ -                SoledadDocument(doc_id=uuid4().hex, rev='rev', -                                json=json.dumps(DOC_CONTENT)) -                for _ in xrange(amount) -            ] -            return pool, docs - -        @pytest.inlineCallbacks -        def put_and_wait(pool, docs): -            yield gatherResults([pool.encrypt_doc(doc) for doc in docs]) - -        yield txbenchmark_with_setup(setup, put_and_wait) -    return test - -test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000) -test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000) -test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000) - - -def create_decrypt(amount, size): -    @pytest.mark.benchmark(group="test_pool_decrypt") -    @pytest.inlineCallbacks -    def test(soledad_client, txbenchmark_with_setup, request, payload): -        DOC_CONTENT = {'payload': payload(size)} -        client = soledad_client() - -        def setup(): -            pool = SyncDecrypterPool( -                client._crypto, -                client._sync_db, -                source_replica_uid=client._dbpool.replica_uid, -                insert_doc_cb=lambda x, y, z: False)  # ignored -            pool.start(amount) -            request.addfinalizer(pool.stop) -            crypto = client._crypto -            docs = [] -            for _ in xrange(amount): -                doc = SoledadDocument( -                    doc_id=uuid4().hex, rev='rev', -                    json=json.dumps(DOC_CONTENT)) -                encrypted_content = json.loads(crypto.encrypt_doc(doc)) -                docs.append((doc.doc_id, encrypted_content)) -            return pool, docs - -        def put_and_wait(pool, docs): -            deferreds = []  # fires on completion -            for idx, (doc_id, content) in enumerate(docs, 1): -                deferreds.append(pool.insert_encrypted_received_doc( -                    doc_id, 'rev', content, idx, "trans_id", idx)) -            return gatherResults(deferreds) - -        yield txbenchmark_with_setup(setup, put_and_wait) -    return test - -test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000) -test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000) -test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py index 6bbcf002..6710caaf 100644 --- a/testing/tests/server/test_server.py +++ b/testing/tests/server/test_server.py @@ -41,7 +41,7 @@ from test_soledad.util import (      BaseSoledadTest,  ) -from leap.soledad.common import crypto +from leap.soledad.client import _crypto  from leap.soledad.client import Soledad  from leap.soledad.server.config import load_configuration  from leap.soledad.server.config import CONFIG_DEFAULTS @@ -412,13 +412,9 @@ class EncryptedSyncTestCase(                  self.assertEqual(soldoc.doc_id, couchdoc.doc_id)                  self.assertEqual(soldoc.rev, couchdoc.rev)                  couch_content = couchdoc.content.keys() -                self.assertEqual(6, len(couch_content)) -                self.assertTrue(crypto.ENC_JSON_KEY in couch_content) -                self.assertTrue(crypto.ENC_SCHEME_KEY in couch_content) -                self.assertTrue(crypto.ENC_METHOD_KEY in couch_content) -                self.assertTrue(crypto.ENC_IV_KEY in couch_content) -                self.assertTrue(crypto.MAC_KEY in couch_content) -                self.assertTrue(crypto.MAC_METHOD_KEY in couch_content) +                self.assertEqual(['raw'], couch_content) +                content = couchdoc.get_json() +                self.assertTrue(_crypto.is_symmetrically_encrypted(content))          d = sol1.get_all_docs()          d.addCallback(_db1AssertEmptyDocList) @@ -473,16 +469,6 @@ class EncryptedSyncTestCase(          """          return self._test_encrypted_sym_sync(passphrase=u'ãáàäéàëÃìïóòöõúùüñç') -    def test_sync_very_large_files(self): -        """ -        Test if Soledad can sync very large files. -        """ -        self.skipTest( -            "Work in progress. For reference, see: " -            "https://leap.se/code/issues/7370") -        length = 100 * (10 ** 6)  # 100 MB -        return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1) -      def test_sync_many_small_files(self):          """          Test if Soledad can sync many smallfiles. diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py deleted file mode 100644 index 4a32885e..00000000 --- a/testing/tests/sync/test_encdecpool.py +++ /dev/null @@ -1,306 +0,0 @@ -# -*- coding: utf-8 -*- -# test_encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for encryption and decryption pool. -""" -import json -from random import shuffle - -from mock import MagicMock -from twisted.internet.defer import inlineCallbacks - -from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool - -from leap.soledad.common.document import SoledadDocument -from test_soledad.util import BaseSoledadTest -from twisted.internet import defer - -DOC_ID = "mydoc" -DOC_REV = "rev" -DOC_CONTENT = {'simple': 'document'} - - -class TestSyncEncrypterPool(BaseSoledadTest): - -    def setUp(self): -        BaseSoledadTest.setUp(self) -        crypto = self._soledad._crypto -        sync_db = self._soledad._sync_db -        self._pool = SyncEncrypterPool(crypto, sync_db) -        self._pool.start() - -    def tearDown(self): -        self._pool.stop() -        BaseSoledadTest.tearDown(self) - -    @inlineCallbacks -    def test_get_encrypted_doc_returns_none(self): -        """ -        Test that trying to get an encrypted doc from the pool returns None if -        the document was never added for encryption. -        """ -        doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV) -        self.assertIsNone(doc) - -    @inlineCallbacks -    def test_encrypt_doc_and_get_it_back(self): -        """ -        Test that the pool actually encrypts a document added to the queue. -        """ -        doc = SoledadDocument( -            doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) - -        yield self._pool.encrypt_doc(doc) -        encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV) - -        self.assertIsNotNone(encrypted) - - -class TestSyncDecrypterPool(BaseSoledadTest): - -    def _insert_doc_cb(self, doc, gen, trans_id): -        """ -        Method used to mock the sync's return_doc_cb callback. -        """ -        self._inserted_docs.append((doc, gen, trans_id)) - -    def _setup_pool(self, sync_db=None): -        sync_db = sync_db or self._soledad._sync_db -        return SyncDecrypterPool( -            self._soledad._crypto, -            sync_db, -            source_replica_uid=self._soledad._dbpool.replica_uid, -            insert_doc_cb=self._insert_doc_cb) - -    def setUp(self): -        BaseSoledadTest.setUp(self) -        # setup the pool -        self._pool = self._setup_pool() -        # reset the inserted docs mock -        self._inserted_docs = [] - -    def tearDown(self): -        if self._pool.running: -            self._pool.stop() -        BaseSoledadTest.tearDown(self) - -    def test_insert_received_doc(self): -        """ -        Test that one document added to the pool is inserted using the -        callback. -        """ -        self._pool.start(1) -        self._pool.insert_received_doc( -            DOC_ID, DOC_REV, "{}", 1, "trans_id", 1) - -        def _assert_doc_was_inserted(_): -            self.assertEqual( -                self._inserted_docs, -                [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")]) - -        self._pool.deferred.addCallback(_assert_doc_was_inserted) -        return self._pool.deferred - -    def test_looping_control(self): -        """ -        Start and stop cleanly. -        """ -        self._pool.start(10) -        self.assertTrue(self._pool.running) -        self._pool.stop() -        self.assertFalse(self._pool.running) -        self.assertTrue(self._pool.deferred.called) - -    def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self): -        """ -        Test that docs_received table is migrated, and has the sync_id column -        """ -        mock_run_query = MagicMock(return_value=defer.succeed(None)) -        mock_sync_db = MagicMock() -        mock_sync_db.runQuery = mock_run_query -        pool = self._setup_pool(mock_sync_db) -        d = pool.start(10) -        pool.stop() - -        def assert_trial_to_create_sync_id_column(_): -            mock_run_query.assert_called_once_with( -                "ALTER TABLE docs_received ADD COLUMN sync_id") - -        d.addCallback(assert_trial_to_create_sync_id_column) -        return d - -    def test_insert_received_doc_many(self): -        """ -        Test that many documents added to the pool are inserted using the -        callback. -        """ -        many = 100 -        self._pool.start(many) - -        # insert many docs in the pool -        for i in xrange(many): -            gen = idx = i + 1 -            doc_id = "doc_id: %d" % idx -            rev = "rev: %d" % idx -            content = {'idx': idx} -            trans_id = "trans_id: %d" % idx -            self._pool.insert_received_doc( -                doc_id, rev, content, gen, trans_id, idx) - -        def _assert_doc_was_inserted(_): -            self.assertEqual(many, len(self._inserted_docs)) -            idx = 1 -            for doc, gen, trans_id in self._inserted_docs: -                expected_gen = idx -                expected_doc_id = "doc_id: %d" % idx -                expected_rev = "rev: %d" % idx -                expected_content = json.dumps({'idx': idx}) -                expected_trans_id = "trans_id: %d" % idx - -                self.assertEqual(expected_doc_id, doc.doc_id) -                self.assertEqual(expected_rev, doc.rev) -                self.assertEqual(expected_content, json.dumps(doc.content)) -                self.assertEqual(expected_gen, gen) -                self.assertEqual(expected_trans_id, trans_id) - -                idx += 1 - -        self._pool.deferred.addCallback(_assert_doc_was_inserted) -        return self._pool.deferred - -    def test_insert_encrypted_received_doc(self): -        """ -        Test that one encrypted document added to the pool is decrypted and -        inserted using the callback. -        """ -        crypto = self._soledad._crypto -        doc = SoledadDocument( -            doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) -        encrypted_content = json.loads(crypto.encrypt_doc(doc)) - -        # insert the encrypted document in the pool -        self._pool.start(1) -        self._pool.insert_encrypted_received_doc( -            DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) - -        def _assert_doc_was_decrypted_and_inserted(_): -            self.assertEqual(1, len(self._inserted_docs)) -            self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) - -        self._pool.deferred.addCallback( -            _assert_doc_was_decrypted_and_inserted) -        return self._pool.deferred - -    @inlineCallbacks -    def test_processing_order(self): -        """ -        This test ensures that processing of documents only occur if there is -        a sequence in place. -        """ -        crypto = self._soledad._crypto - -        docs = [] -        for i in xrange(1, 10): -            i = str(i) -            doc = SoledadDocument( -                doc_id=DOC_ID + i, rev=DOC_REV + i, -                json=json.dumps(DOC_CONTENT)) -            encrypted_content = json.loads(crypto.encrypt_doc(doc)) -            docs.append((doc, encrypted_content)) - -        # insert the encrypted document in the pool -        yield self._pool.start(10)  # pool is expecting to process 10 docs -        self._pool._loop.stop()  # we are processing manually -        # first three arrives, forming a sequence -        for i, (doc, encrypted_content) in enumerate(docs[:3]): -            gen = idx = i + 1 -            yield self._pool.insert_encrypted_received_doc( -                doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) - -        # last one arrives alone, so it can't be processed -        doc, encrypted_content = docs[-1] -        yield self._pool.insert_encrypted_received_doc( -            doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) - -        yield self._pool._decrypt_and_recurse() - -        self.assertEqual(3, self._pool._processed_docs) - -    def test_insert_encrypted_received_doc_many(self, many=100): -        """ -        Test that many encrypted documents added to the pool are decrypted and -        inserted using the callback. -        """ -        crypto = self._soledad._crypto -        self._pool.start(many) -        docs = [] - -        # insert many encrypted docs in the pool -        for i in xrange(many): -            gen = idx = i + 1 -            doc_id = "doc_id: %d" % idx -            rev = "rev: %d" % idx -            content = {'idx': idx} -            trans_id = "trans_id: %d" % idx - -            doc = SoledadDocument( -                doc_id=doc_id, rev=rev, json=json.dumps(content)) - -            encrypted_content = json.loads(crypto.encrypt_doc(doc)) -            docs.append((doc_id, rev, encrypted_content, gen, -                         trans_id, idx)) -        shuffle(docs) - -        for doc in docs: -            self._pool.insert_encrypted_received_doc(*doc) - -        def _assert_docs_were_decrypted_and_inserted(_): -            self.assertEqual(many, len(self._inserted_docs)) -            idx = 1 -            for doc, gen, trans_id in self._inserted_docs: -                expected_gen = idx -                expected_doc_id = "doc_id: %d" % idx -                expected_rev = "rev: %d" % idx -                expected_content = json.dumps({'idx': idx}) -                expected_trans_id = "trans_id: %d" % idx - -                self.assertEqual(expected_doc_id, doc.doc_id) -                self.assertEqual(expected_rev, doc.rev) -                self.assertEqual(expected_content, json.dumps(doc.content)) -                self.assertEqual(expected_gen, gen) -                self.assertEqual(expected_trans_id, trans_id) - -                idx += 1 - -        self._pool.deferred.addCallback( -            _assert_docs_were_decrypted_and_inserted) -        return self._pool.deferred - -    @inlineCallbacks -    def test_pool_reuse(self): -        """ -        The pool is reused between syncs, this test verifies that -        reusing is fine. -        """ -        for i in xrange(3): -            yield self.test_insert_encrypted_received_doc_many(5) -            self._inserted_docs = [] -            decrypted_docs = yield self._pool._get_docs(encrypted=False) -            # check that decrypted docs staging is clean -            self.assertEquals([], decrypted_docs) -            self._pool.stop() diff --git a/testing/tests/sync/test_sqlcipher_sync.py b/testing/tests/sync/test_sqlcipher_sync.py index 3cbefc8b..26f63a40 100644 --- a/testing/tests/sync/test_sqlcipher_sync.py +++ b/testing/tests/sync/test_sqlcipher_sync.py @@ -27,8 +27,6 @@ from leap.soledad.common.l2db import sync  from leap.soledad.common.l2db import vectorclock  from leap.soledad.common.l2db import errors -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.client.crypto import decrypt_doc_dict  from leap.soledad.client.http_target import SoledadHTTPSyncTarget  from test_soledad import u1db_tests as tests @@ -545,13 +543,7 @@ class SQLCipherDatabaseSyncTests(          self.assertFalse(doc2.has_conflicts)          self.sync(self.db2, db3)          doc3 = db3.get_doc('the-doc') -        if ENC_SCHEME_KEY in doc3.content: -            _crypto = self._soledad._crypto -            key = _crypto.doc_passphrase(doc3.doc_id) -            secret = _crypto.secret -            doc3.set_json(decrypt_doc_dict( -                doc3.content, -                doc3.doc_id, doc3.rev, key, secret)) +          self.assertEqual(doc4.get_json(), doc3.get_json())          self.assertFalse(doc3.has_conflicts)          self.db1.close() @@ -713,15 +705,12 @@ def make_local_db_and_soledad_target(      test.startTwistedServer()      replica_uid = os.path.basename(path)      db = test.request_state._create_database(replica_uid) -    sync_db = test._soledad._sync_db -    sync_enc_pool = test._soledad._sync_enc_pool      st = soledad_sync_target(          test, db._dbname, -        source_replica_uid=source_replica_uid, -        sync_db=sync_db, -        sync_enc_pool=sync_enc_pool) +        source_replica_uid=source_replica_uid)      return db, st +  target_scenarios = [      ('leap', {          'create_db_and_target': make_local_db_and_soledad_target, diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py index 5290003e..76757c5b 100644 --- a/testing/tests/sync/test_sync.py +++ b/testing/tests/sync/test_sync.py @@ -19,6 +19,7 @@ import threading  import time  from urlparse import urljoin +from mock import Mock  from twisted.internet import defer  from testscenarios import TestWithScenarios @@ -184,10 +185,9 @@ class TestSoledadDbSync(          target = soledad_sync_target(              self, self.db2._dbname,              source_replica_uid=self._soledad._dbpool.replica_uid) -        self.addCleanup(target.close)          return sync.SoledadSynchronizer(              self.db, -            target).sync(defer_decryption=False) +            target).sync()      @defer.inlineCallbacks      def test_db_sync(self): @@ -211,3 +211,21 @@ class TestSoledadDbSync(              self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False)      # TODO: add u1db.tests.test_sync.TestRemoteSyncIntegration + + +class TestSoledadSynchronizer(BaseSoledadTest): + +    def setUp(self): +        BaseSoledadTest.setUp(self) +        self.db = Mock() +        self.target = Mock() +        self.synchronizer = sync.SoledadSynchronizer( +            self.db, +            self.target) + +    def test_docs_by_gen_includes_deleted(self): +        changes = [('id', 'gen', 'trans')] +        docs_by_gen = self.synchronizer._docs_by_gen_from_changes(changes) +        f, args, kwargs = docs_by_gen[0][0] +        self.assertIn('include_deleted', kwargs) +        self.assertTrue(kwargs['include_deleted']) diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py deleted file mode 100644 index 4948aaf8..00000000 --- a/testing/tests/sync/test_sync_deferred.py +++ /dev/null @@ -1,196 +0,0 @@ -# test_sync_deferred.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test Leap backend bits: sync with deferred encryption/decryption. -""" -import time -import os -import random -import string -import shutil - -from urlparse import urljoin - -from twisted.internet import defer - -from leap.soledad.common import couch - -from leap.soledad.client import sync -from leap.soledad.client.sqlcipher import SQLCipherOptions -from leap.soledad.client.sqlcipher import SQLCipherDatabase - -from testscenarios import TestWithScenarios - -from test_soledad import u1db_tests as tests -from test_soledad.util import ADDRESS -from test_soledad.util import SoledadWithCouchServerMixin -from test_soledad.util import make_soledad_app -from test_soledad.util import soledad_sync_target - - -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = True - -WAIT_STEP = 1 -MAX_WAIT = 10 -DBPASS = "pass" - - -class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): - -    """ -    Another base class for testing the deferred encryption/decryption during -    the syncs, using the intermediate database. -    """ -    defer_sync_encryption = True - -    def setUp(self): -        SoledadWithCouchServerMixin.setUp(self) -        self.startTwistedServer() -        # config info -        self.db1_file = os.path.join(self.tempdir, "db1.u1db") -        os.unlink(self.db1_file) -        self.db_pass = DBPASS -        self.email = ADDRESS - -        # get a random prefix for each test, so we do not mess with -        # concurrency during initialization and shutting down of -        # each local db. -        self.rand_prefix = ''.join( -            map(lambda x: random.choice(string.ascii_letters), range(6))) - -        # open test dbs: db1 will be the local sqlcipher db (which -        # instantiates a syncdb). We use the self._soledad instance that was -        # already created on some setUp method. -        import binascii -        tohex = binascii.b2a_hex -        key = tohex(self._soledad.secrets.get_local_storage_key()) -        sync_db_key = tohex(self._soledad.secrets.get_sync_db_key()) -        dbpath = self._soledad._local_db_path - -        self.opts = SQLCipherOptions( -            dbpath, key, is_raw_key=True, create=False, -            defer_encryption=True, sync_db_key=sync_db_key) -        self.db1 = SQLCipherDatabase(self.opts) - -        self.db2 = self.request_state._create_database('test') - -    def tearDown(self): -        # XXX should not access "private" attrs -        shutil.rmtree(os.path.dirname(self._soledad._local_db_path)) -        SoledadWithCouchServerMixin.tearDown(self) - - -class SyncTimeoutError(Exception): - -    """ -    Dummy exception to notify timeout during sync. -    """ -    pass - - -class TestSoledadDbSyncDeferredEncDecr( -        TestWithScenarios, -        BaseSoledadDeferredEncTest, -        tests.TestCaseWithServer): - -    """ -    Test db.sync remote sync shortcut. -    Case with deferred encryption and decryption: using the intermediate -    syncdb. -    """ - -    scenarios = [ -        ('http', { -            'make_app_with_state': make_soledad_app, -            'make_database_for_test': tests.make_memory_database_for_test, -        }), -    ] - -    oauth = False -    token = True - -    def setUp(self): -        """ -        Need to explicitely invoke inicialization on all bases. -        """ -        BaseSoledadDeferredEncTest.setUp(self) -        self.server = self.server_thread = None -        self.syncer = None - -    def tearDown(self): -        """ -        Need to explicitely invoke destruction on all bases. -        """ -        dbsyncer = getattr(self, 'dbsyncer', None) -        if dbsyncer: -            dbsyncer.close() -        BaseSoledadDeferredEncTest.tearDown(self) - -    def do_sync(self): -        """ -        Perform sync using SoledadSynchronizer, SoledadSyncTarget -        and Token auth. -        """ -        replica_uid = self._soledad._dbpool.replica_uid -        sync_db = self._soledad._sync_db -        sync_enc_pool = self._soledad._sync_enc_pool -        dbsyncer = self._soledad._dbsyncer  # Soledad.sync uses the dbsyncer - -        target = soledad_sync_target( -            self, self.db2._dbname, -            source_replica_uid=replica_uid, -            sync_db=sync_db, -            sync_enc_pool=sync_enc_pool) -        self.addCleanup(target.close) -        return sync.SoledadSynchronizer( -            dbsyncer, -            target).sync(defer_decryption=True) - -    def wait_for_sync(self): -        """ -        Wait for sync to finish. -        """ -        wait = 0 -        syncer = self.syncer -        if syncer is not None: -            while syncer.syncing: -                time.sleep(WAIT_STEP) -                wait += WAIT_STEP -                if wait >= MAX_WAIT: -                    raise SyncTimeoutError - -    @defer.inlineCallbacks -    def test_db_sync(self): -        """ -        Test sync. - -        Adapted to check for encrypted content. -        """ -        doc1 = self.db1.create_doc_from_json(tests.simple_doc) -        doc2 = self.db2.create_doc_from_json(tests.nested_doc) -        local_gen_before_sync = yield self.do_sync() - -        gen, _, changes = self.db1.whats_changed(local_gen_before_sync) -        self.assertEqual(1, len(changes)) - -        self.assertEqual(doc2.doc_id, changes[0][0]) -        self.assertEqual(1, gen - local_gen_before_sync) - -        self.assertGetEncryptedDoc( -            self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) -        self.assertGetEncryptedDoc( -            self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False) diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py index 2626ab2a..432a3cd2 100644 --- a/testing/tests/sync/test_sync_mutex.py +++ b/testing/tests/sync/test_sync_mutex.py @@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target  _old_sync = SoledadSynchronizer.sync -def _timed_sync(self, defer_decryption=True): +def _timed_sync(self):      t = time.time()      sync_id = uuid.uuid4() @@ -62,10 +62,11 @@ def _timed_sync(self, defer_decryption=True):          self.source.sync_times[sync_id]['end'] = t          return passthrough -    d = _old_sync(self, defer_decryption=defer_decryption) +    d = _old_sync(self)      d.addBoth(_store_finish_time)      return d +  SoledadSynchronizer.sync = _timed_sync  # -- end of monkey-patching diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index 964468ce..6ce9a5c5 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -30,10 +30,11 @@ from testscenarios import TestWithScenarios  from twisted.internet import defer  from leap.soledad.client import http_target as target -from leap.soledad.client import crypto +from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver  from leap.soledad.client.sqlcipher import SQLCipherU1DBSync  from leap.soledad.client.sqlcipher import SQLCipherOptions  from leap.soledad.client.sqlcipher import SQLCipherDatabase +from leap.soledad.client import _crypto  from leap.soledad.common import l2db @@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app  from test_soledad.util import make_token_soledad_app  from test_soledad.util import make_soledad_document_for_test  from test_soledad.util import soledad_sync_target +from twisted.trial import unittest  from test_soledad.util import SoledadWithCouchServerMixin  from test_soledad.util import ADDRESS  from test_soledad.util import SQLCIPHER_SCENARIOS @@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS  # The following tests come from `u1db.tests.test_remote_sync_target`.  # ----------------------------------------------------------------------------- -class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin): +class TestSoledadParseReceivedDocResponse(unittest.TestCase):      """      Some tests had to be copied to this class so we can instantiate our own      target.      """ -    def setUp(self): -        SoledadWithCouchServerMixin.setUp(self) -        creds = {'token': { -            'uuid': 'user-uuid', -            'token': 'auth-token', -        }} -        self.target = target.SoledadHTTPSyncTarget( -            self.couch_url, -            uuid4().hex, -            creds, -            self._soledad._crypto, -            None) - -    def tearDown(self): -        self.target.close() -        SoledadWithCouchServerMixin.tearDown(self) +    def parse(self, stream): +        parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42)) +        parser.dataReceived(stream) +        parser.finish()      def test_extra_comma(self): -        """ -        Test adapted to use encrypted content. -        """          doc = SoledadDocument('i', rev='r') -        doc.content = {} -        _crypto = self._soledad._crypto -        key = _crypto.doc_passphrase(doc.doc_id) -        secret = _crypto.secret +        doc.content = {'a': 'b'} -        enc_json = crypto.encrypt_docstr( -            doc.get_json(), doc.doc_id, doc.rev, -            key, secret) +        encrypted_docstr = _crypto.SoledadCrypto('safe').encrypt_doc(doc)          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n{},\r\n]") +            self.parse("[\r\n{},\r\n]")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  ('[\r\n{},\r\n{"id": "i", "rev": "r", ' + -                 '"content": %s, "gen": 3, "trans_id": "T-sid"}' + -                 ',\r\n]') % json.dumps(enc_json)) +                 '"gen": 3, "trans_id": "T-sid"},\r\n' + +                 '%s,\r\n]') % encrypted_docstr)      def test_wrong_start(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("{}\r\n]") - -        with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("\r\n{}\r\n]") +            self.parse("{}\r\n]")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("") +            self.parse("\r\n{}\r\n]")      def test_wrong_end(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n{}") +            self.parse("[\r\n{}")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n") +            self.parse("[\r\n")      def test_missing_comma(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{}\r\n{"id": "i", "rev": "r", '                  '"content": "c", "gen": 3}\r\n]')      def test_no_entries(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n]") +            self.parse("[\r\n]")      def test_error_in_stream(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{"new_generation": 0},'                  '\r\n{"error": "unavailable"}\r\n')          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{"error": "unavailable"}\r\n')          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n') +            self.parse('[\r\n{"error": "?"}\r\n')  #  # functions for TestRemoteSyncTargets @@ -151,13 +130,9 @@ def make_local_db_and_soledad_target(      test.startTwistedServer()      replica_uid = os.path.basename(path)      db = test.request_state._create_database(replica_uid) -    sync_db = test._soledad._sync_db -    sync_enc_pool = test._soledad._sync_enc_pool      st = soledad_sync_target(          test, db._dbname, -        source_replica_uid=source_replica_uid, -        sync_db=sync_db, -        sync_enc_pool=sync_enc_pool) +        source_replica_uid=source_replica_uid)      return db, st @@ -188,16 +163,11 @@ class TestSoledadSyncTarget(      def getSyncTarget(self, path=None, source_replica_uid=uuid4().hex):          if self.port is None:              self.startTwistedServer() -        sync_db = self._soledad._sync_db -        sync_enc_pool = self._soledad._sync_enc_pool          if path is None:              path = self.db2._dbname          target = self.sync_target(              self, path, -            source_replica_uid=source_replica_uid, -            sync_db=sync_db, -            sync_enc_pool=sync_enc_pool) -        self.addCleanup(target.close) +            source_replica_uid=source_replica_uid)          return target      def setUp(self): @@ -229,10 +199,10 @@ class TestSoledadSyncTarget(              other_docs.append((doc.doc_id, doc.rev, doc.get_json()))          doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        get_doc = (lambda _: doc, (1,), {})          new_gen, trans_id = yield remote_target.sync_exchange( -            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=receive_doc, -            defer_decryption=False) +            [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0, +            last_known_trans_id=None, insert_doc_cb=receive_doc)          self.assertEqual(1, new_gen)          self.assertGetEncryptedDoc(              db, 'doc-here', 'replica:1', '{"value": "here"}', False) @@ -278,15 +248,16 @@ class TestSoledadSyncTarget(          doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}')          doc2 = self.make_document('doc-here2', 'replica:1',                                    '{"value": "here2"}') +        get_doc1 = (lambda _: doc1, (1,), {}) +        get_doc2 = (lambda _: doc2, (2,), {})          with self.assertRaises(l2db.errors.U1DBError):              yield remote_target.sync_exchange( -                [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], +                [(get_doc1, 10, 'T-sid'), (get_doc2, 11, 'T-sud')],                  'replica',                  last_known_generation=0,                  last_known_trans_id=None, -                insert_doc_cb=receive_doc, -                defer_decryption=False) +                insert_doc_cb=receive_doc)          self.assertGetEncryptedDoc(              db, 'doc-here', 'replica:1', '{"value": "here"}', @@ -297,9 +268,8 @@ class TestSoledadSyncTarget(          # retry          trigger_ids = []          new_gen, trans_id = yield remote_target.sync_exchange( -            [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=receive_doc, -            defer_decryption=False) +            [(get_doc2, 11, 'T-sud')], 'replica', last_known_generation=0, +            last_known_trans_id=None, insert_doc_cb=receive_doc)          self.assertGetEncryptedDoc(              db, 'doc-here2', 'replica:1', '{"value": "here2"}',              False) @@ -328,10 +298,11 @@ class TestSoledadSyncTarget(              replica_uid_box.append(replica_uid)          doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        get_doc = (lambda _: doc, (1,), {})          new_gen, trans_id = yield remote_target.sync_exchange( -            [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, +            [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0,              last_known_trans_id=None, insert_doc_cb=receive_doc, -            ensure_callback=ensure_cb, defer_decryption=False) +            ensure_callback=ensure_cb)          self.assertEqual(1, new_gen)          db = self.db2          self.assertEqual(1, len(replica_uid_box)) @@ -339,6 +310,37 @@ class TestSoledadSyncTarget(          self.assertGetEncryptedDoc(              db, 'doc-here', 'replica:1', '{"value": "here"}', False) +    @defer.inlineCallbacks +    def test_sync_exchange_send_events(self): +        """ +        Test for sync exchange's SOLEDAD_SYNC_SEND_STATUS event. +        """ +        remote_target = self.getSyncTarget() +        uuid = remote_target.uuid +        events = [] + +        def mocked_events(*args): +            events.append((args)) +        self.patch( +            target.send, '_emit_send_status', mocked_events) + +        doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        doc2 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        doc3 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') +        get_doc = (lambda _: doc, (1,), {}) +        get_doc2 = (lambda _: doc2, (1,), {}) +        get_doc3 = (lambda _: doc3, (1,), {}) +        docs = [(get_doc, 10, 'T-sid'), +                (get_doc2, 11, 'T-sid2'), (get_doc3, 12, 'T-sid3')] +        new_gen, trans_id = yield remote_target.sync_exchange( +            docs, 'replica', last_known_generation=0, +            last_known_trans_id=None, insert_doc_cb=lambda _: 1, +            ensure_callback=lambda _: 1) +        self.assertEqual(1, new_gen) +        self.assertEqual(4, len(events)) +        self.assertEquals([(uuid, 0, 3), (uuid, 1, 3), (uuid, 2, 3), +                           (uuid, 3, 3)], events) +      def test_sync_exchange_in_stream_error(self):          self.skipTest("bypass this test because our sync_exchange process "                        "does not return u1db error 503 \"unavailable\" for " @@ -421,7 +423,6 @@ class SoledadDatabaseSyncTargetTests(      def tearDown(self):          self.db.close() -        self.st.close()          tests.TestCaseWithServer.tearDown(self)          SoledadWithCouchServerMixin.tearDown(self) @@ -442,12 +443,12 @@ class SoledadDatabaseSyncTargetTests(          This test was adapted to decrypt remote content before assert.          """          docs_by_gen = [ -            (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, -             'T-sid')] +            ((self.make_document, +              ('doc-id', 'replica:1', tests.simple_doc,), {}), +                10, 'T-sid')]          new_gen, trans_id = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertGetEncryptedDoc(              self.db, 'doc-id', 'replica:1', tests.simple_doc, False)          self.assertTransactionLog(['doc-id'], self.db) @@ -465,14 +466,13 @@ class SoledadDatabaseSyncTargetTests(          This test was adapted to decrypt remote content before assert.          """          docs_by_gen = [ -            (self.make_document( -                'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), -            (self.make_document( -                'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] +            ((self.make_document, +                ('doc-id', 'replica:1', tests.simple_doc), {}), 10, 'T-1'), +            ((self.make_document, +                ('doc-id2', 'replica:1', tests.nested_doc), {}), 11, 'T-2')]          new_gen, trans_id = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertGetEncryptedDoc(              self.db, 'doc-id', 'replica:1', tests.simple_doc, False)          self.assertGetEncryptedDoc( @@ -498,8 +498,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)          new_gen, _ = yield self.st.sync_exchange(              [], 'other-replica', last_known_generation=0, -            last_known_trans_id=None, insert_doc_cb=self.receive_doc, -            defer_decryption=False) +            last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)          self.assertEqual(2, new_gen)          self.assertEqual( @@ -552,7 +551,8 @@ class SoledadDatabaseSyncTargetTests(          doc = self.db.create_doc_from_json('{}')          edit_rev = 'replica:1|' + doc.rev          docs_by_gen = [ -            (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] +            ((self.make_document, (doc.doc_id, edit_rev, None), {}), +             10, 'T-sid')]          new_gen, trans_id = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0,              last_known_trans_id=None, insert_doc_cb=self.receive_doc) @@ -571,7 +571,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id], self.db)          new_doc = '{"key": "altval"}'          docs_by_gen = [ -            (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, +            ((self.make_document, (doc.doc_id, 'replica:1', new_doc), {}), 10,               'T-sid')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=0, @@ -591,7 +591,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id], self.db)          gen, txid = self.db._get_generation_info()          docs_by_gen = [ -            (self.make_document(doc.doc_id, doc.rev, tests.simple_doc), +            ((self.make_document, (doc.doc_id, doc.rev, tests.simple_doc), {}),               10, 'T-sid')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'replica', last_known_generation=gen, @@ -624,9 +624,9 @@ class SoledadDatabaseSyncTargetTests(              [], 'other-replica', last_known_generation=0,              last_known_trans_id=None, insert_doc_cb=self.receive_doc)          self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) +        self.assertEqual(2, new_gen)          self.assertEqual(              (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) -        self.assertEqual(2, new_gen)          if self.whitebox:              self.assertEqual(self.db._last_exchange_log['return'],                               {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) @@ -637,7 +637,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id], self.db)          new_doc = '{"key": "altval"}'          docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, +            ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,               'T-sid')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'other-replica', last_known_generation=0, @@ -662,7 +662,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id], self.db)          new_doc = '{"key": "altval"}'          docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, +            ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,               'T-sid')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'other-replica', last_known_generation=0, @@ -683,7 +683,7 @@ class SoledadDatabaseSyncTargetTests(          self.assertTransactionLog([doc.doc_id], self.db)          new_doc = '{"key": "altval"}'          docs_by_gen = [ -            (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, +            ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10,               'T-sid')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'other-replica', last_known_generation=0, @@ -694,9 +694,9 @@ class SoledadDatabaseSyncTargetTests(      def test_sync_exchange_converged_handling(self):          doc = self.db.create_doc_from_json(tests.simple_doc)          docs_by_gen = [ -            (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), -            (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, -             'T-bar')] +            ((self.make_document, ('new', 'other:1', '{}'), {}), 4, 'T-foo'), +            ((self.make_document, (doc.doc_id, doc.rev, doc.get_json()), {}), +                5, 'T-bar')]          new_gen, _ = yield self.st.sync_exchange(              docs_by_gen, 'other-replica', last_known_generation=0,              last_known_trans_id=None, insert_doc_cb=self.receive_doc) @@ -780,9 +780,6 @@ class SoledadDatabaseSyncTargetTests(          self.assertEqual(expected, called) -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = False -  WAIT_STEP = 1  MAX_WAIT = 10  DBPASS = "pass" @@ -842,12 +839,10 @@ class TestSoledadDbSync(          import binascii          tohex = binascii.b2a_hex          key = tohex(self._soledad.secrets.get_local_storage_key()) -        sync_db_key = tohex(self._soledad.secrets.get_sync_db_key())          dbpath = self._soledad._local_db_path          self.opts = SQLCipherOptions( -            dbpath, key, is_raw_key=True, create=False, -            defer_encryption=True, sync_db_key=sync_db_key) +            dbpath, key, is_raw_key=True, create=False)          self.db1 = SQLCipherDatabase(self.opts)          self.db2 = self.request_state._create_database(replica_uid='test') @@ -886,12 +881,10 @@ class TestSoledadDbSync(                  self.opts,                  crypto,                  replica_uid, -                None, -                defer_encryption=True) +                None)              self.dbsyncer = dbsyncer              return dbsyncer.sync(target_url, -                                 creds=creds, -                                 defer_decryption=DEFER_DECRYPTION) +                                 creds=creds)          else:              return self._do_sync(self, target_name) diff --git a/testing/tox.ini b/testing/tox.ini index 31cb8a4f..c46c6af1 100644 --- a/testing/tox.ini +++ b/testing/tox.ini @@ -1,12 +1,15 @@  [tox]  envlist = py27 +skipsdist=True  [testenv]  basepython = python2.7 -commands = py.test --cov-report=html \ +commands = py.test --ignore=tests/benchmarks \ +                   --cov-report=html \                     --cov-report=term \ -		   --cov=leap.soledad \ -		   {posargs} +                   --cov=leap.soledad \ +                   {posargs} +usedevelop = True  deps =      coverage      pytest @@ -18,6 +21,7 @@ deps =      pdbpp      couchdb      requests +    service_identity  # install soledad local packages      -e../common      -e../client @@ -27,11 +31,11 @@ setenv =      TERM=xterm  install_command = pip install {opts} {packages} -[testenv:perf] +[testenv:benchmark]  deps =      {[testenv]deps}      pytest-benchmark -commands = py.test tests/perf {posargs} +commands = py.test --benchmark-only {posargs}  [testenv:code-check]  changedir = .. @@ -39,12 +43,12 @@ deps =      pep8      flake8  commands = -    pep8 client server common -    flake8 --ignore=F812,E731 client server common +    pep8 +    flake8  [testenv:parallel]  deps =      {[testenv]deps}      pytest-xdist  install_command = pip install {opts} {packages} -commands = py.test {posargs} -n 4 +commands = py.test --ignore=tests/benchmarks {posargs} -n 4 | 
