diff options
68 files changed, 1894 insertions, 2555 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index dd4e4605..ac2ae1f0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,7 +1,6 @@ stages: - code-check - tests - - benchmark # Cache tox envs between builds cache: @@ -22,15 +21,5 @@ tests: script: - cd testing - tox -- --couch-url http://couchdb:5984 - -benchmark: - stage: benchmark - image: leapcode/soledad:latest - services: - - couchdb - script: - - cd testing - - tox -e perf -- --couch-url http://couchdb:5984 tags: - - docker - - benchmark + - couchdb diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 12cb56ab..f47749d1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,26 @@ +0.9.2 - 22 December, 2016 ++++++++++++++++++++++++++ + +Performance improvements +~~~~~~~~~~~~~~~~~~~~~~~~ + +- use AES 256 GCM mode instead of CTR+HMAC. +- streaming encryption/decryption and data transfer. + +Server +~~~~~~ + +- move server to a twisted resource entrypoint. + +Client +~~~~~~ + +- use twisted http agent in the client. +- maintain backwards compatibility with old crypto scheme (AES 256 CTR+HMAC). + No migration for now, only in 0.10. +- remove the encryption/decryption pools, replace for inline streaming crypto. +- use sqlcipher transactions on sync. + 0.9.1 - 27 November, 2016 +++++++++++++++++++++++++ diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index 2ae844e1..24b168b4 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -2,3 +2,4 @@ pysqlcipher>2.6.3 scrypt zope.proxy twisted +cryptography diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 245a8971..3a114021 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -21,6 +21,7 @@ from leap.soledad.client.api import Soledad from leap.soledad.common import soledad_assert from ._version import get_versions + __version__ = get_versions()['version'] del get_versions diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py new file mode 100644 index 00000000..4bbdd044 --- /dev/null +++ b/client/src/leap/soledad/client/_crypto.py @@ -0,0 +1,386 @@ +# -*- coding: utf-8 -*- +# _crypto.py +# Copyright (C) 2016 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Cryptographic operations for the soledad client +""" + +import binascii +import base64 +import hashlib +import hmac +import os +import re +import struct +import time + +from io import BytesIO +from itertools import imap +from collections import namedtuple + +from twisted.internet import defer +from twisted.internet import interfaces +from twisted.web.client import FileBodyProducer + +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends.multibackend import MultiBackend +from cryptography.hazmat.backends.openssl.backend \ + import Backend as OpenSSLBackend + +from zope.interface import implements + + +SECRET_LENGTH = 64 + +CRYPTO_BACKEND = MultiBackend([OpenSSLBackend()]) + +PACMAN = struct.Struct('2sbbQ16s255p255p') +BLOB_SIGNATURE_MAGIC = '\x13\x37' + + +ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1) +ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2) +DocInfo = namedtuple('DocInfo', 'doc_id rev') + + +class EncryptionDecryptionError(Exception): + pass + + +class InvalidBlob(Exception): + pass + + +class SoledadCrypto(object): + """ + This class provides convenient methods for document encryption and + decryption using BlobEncryptor and BlobDecryptor classes. + """ + def __init__(self, secret): + """ + Initialize the crypto object. + + :param secret: The Soledad remote storage secret. + :type secret: str + """ + self.secret = secret + + def encrypt_doc(self, doc): + """ + Creates and configures a BlobEncryptor, asking it to start encryption + and wrapping the result as a simple JSON string with a "raw" key. + + :param doc: the document to be encrypted. + :type doc: SoledadDocument + :return: A deferred whose callback will be invoked with a JSON string + containing the ciphertext as the value of "raw" key. + :rtype: twisted.internet.defer.Deferred + """ + + def put_raw(blob): + raw = blob.getvalue() + return '{"raw": "' + raw + '"}' + + content = BytesIO(str(doc.get_json())) + info = DocInfo(doc.doc_id, doc.rev) + del doc + encryptor = BlobEncryptor(info, content, secret=self.secret) + d = encryptor.encrypt() + d.addCallback(put_raw) + return d + + def decrypt_doc(self, doc): + """ + Creates and configures a BlobDecryptor, asking it decrypt and returning + the decrypted cleartext content from the encrypted document. + + :param doc: the document to be decrypted. + :type doc: SoledadDocument + :return: The decrypted cleartext content of the document. + :rtype: str + """ + info = DocInfo(doc.doc_id, doc.rev) + ciphertext = BytesIO() + payload = doc.content['raw'] + del doc + ciphertext.write(str(payload)) + decryptor = BlobDecryptor(info, ciphertext, secret=self.secret) + return decryptor.decrypt() + + +def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm): + """ + Encrypt data using AES-256 cipher in selected mode. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt data (must be 256 bits long). + :type key: str + + :return: A tuple with the initialization vector and the ciphertext, both + encoded as base64. + :rtype: (str, str) + """ + mode = _mode_by_method(method) + encryptor = AESWriter(key, mode=mode) + encryptor.write(data) + _, ciphertext = encryptor.end() + iv = base64.b64encode(encryptor.iv) + tag = encryptor.tag or '' + return iv, ciphertext + tag + + +def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): + """ + Decrypt data using AES-256 cipher in selected mode. + + :param data: The data to be decrypted. + :type data: str + :param key: The symmetric key used to decrypt data (must be 256 bits + long). + :type key: str + :param iv: The base64 encoded initialization vector. + :type iv: str + + :return: The decrypted data. + :rtype: str + """ + _iv = base64.b64decode(str(iv)) + mode = _mode_by_method(method) + tag = None + if mode == modes.GCM: + data, tag = data[:-16], data[-16:] + decryptor = AESWriter(key, _iv, tag=tag, mode=mode) + decryptor.write(data) + _, plaintext = decryptor.end() + return plaintext + + +class BlobEncryptor(object): + """ + Produces encrypted data from the cleartext data associated with a given + SoledadDocument using AES-256 cipher in GCM mode. + The production happens using a Twisted's FileBodyProducer, which uses a + Cooperator to schedule calls and can be paused/resumed. Each call takes at + most 65536 bytes from the input. + Both the production input and output are file descriptors, so they can be + applied to a stream of data. + """ + def __init__(self, doc_info, content_fd, secret=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + self._content_fd = content_fd + self._producer = FileBodyProducer(content_fd, readSize=2**16) + + sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self._aes = AESWriter(sym_key) + self._aes.authenticate(self._make_preamble()) + + @property + def iv(self): + return self._aes.iv + + @property + def tag(self): + return self._aes.tag + + def encrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + d = self._producer.startProducing(self._aes) + d.addCallback(lambda _: self._end_crypto_stream()) + return d + + def _make_preamble(self): + current_time = int(time.time()) + + return PACMAN.pack( + BLOB_SIGNATURE_MAGIC, + ENC_SCHEME.symkey, + ENC_METHOD.aes_256_gcm, + current_time, + self.iv, + str(self.doc_id), + str(self.rev)) + + def _end_crypto_stream(self): + preamble, encrypted = self._aes.end() + result = BytesIO() + result.write( + base64.urlsafe_b64encode(preamble)) + result.write(' ') + result.write( + base64.urlsafe_b64encode(encrypted + self.tag)) + return defer.succeed(result) + + +class BlobDecryptor(object): + """ + Decrypts an encrypted blob associated with a given Document. + + Will raise an exception if the blob doesn't have the expected structure, or + if the GCM tag doesn't verify. + """ + + def __init__(self, doc_info, ciphertext_fd, result=None, + secret=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + + ciphertext_fd, preamble, iv = self._consume_preamble(ciphertext_fd) + + self.result = result or BytesIO() + sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag) + self._aes.authenticate(preamble) + + self._producer = FileBodyProducer(ciphertext_fd, readSize=2**16) + + def _consume_preamble(self, ciphertext_fd): + ciphertext_fd.seek(0) + try: + preamble, ciphertext = _split(ciphertext_fd.getvalue()) + self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] + except (TypeError, binascii.Error): + raise InvalidBlob + ciphertext_fd.close() + + if len(preamble) != PACMAN.size: + raise InvalidBlob + + try: + unpacked_data = PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + except struct.error: + raise InvalidBlob + + if magic != BLOB_SIGNATURE_MAGIC: + raise InvalidBlob + # TODO check timestamp + if sch != ENC_SCHEME.symkey: + raise InvalidBlob('invalid scheme') + if meth != ENC_METHOD.aes_256_gcm: + raise InvalidBlob('invalid encryption scheme') + if rev != self.rev: + raise InvalidBlob('invalid revision') + if doc_id != self.doc_id: + raise InvalidBlob('invalid revision') + return BytesIO(ciphertext), preamble, iv + + def _end_stream(self): + try: + return self._aes.end()[1] + except InvalidTag: + raise InvalidBlob('Invalid Tag. Blob authentication failed.') + + def decrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + d = self._producer.startProducing(self._aes) + d.addCallback(lambda _: self._end_stream()) + return d + + +class AESWriter(object): + """ + A Twisted's Consumer implementation that takes an input file descriptor and + applies AES-256 cipher in GCM mode. + """ + implements(interfaces.IConsumer) + + def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): + if len(key) != 32: + raise EncryptionDecryptionError('key is not 256 bits') + self.iv = iv or os.urandom(16) + self.buffer = _buffer or BytesIO() + cipher = _get_aes_cipher(key, self.iv, tag, mode) + cipher = cipher.decryptor() if tag else cipher.encryptor() + self.cipher, self.aead = cipher, '' + + def authenticate(self, data): + self.aead += data + self.cipher.authenticate_additional_data(data) + + @property + def tag(self): + return getattr(self.cipher, 'tag', None) + + def write(self, data): + self.buffer.write(self.cipher.update(data)) + + def end(self): + self.buffer.write(self.cipher.finalize()) + return self.aead, self.buffer.getvalue() + + +def is_symmetrically_encrypted(content): + """ + Returns True if the document was symmetrically encrypted. + 'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically + encrypted value for enc_scheme flag). + + :param doc: The document content as string + :type doc: str + + :rtype: bool + """ + return content and content[:13] == '{"raw": "EzcB' + + +# utils + + +def _hmac_sha256(key, data): + return hmac.new(key, data, hashlib.sha256).digest() + + +def _get_sym_key_for_doc(doc_id, secret): + key = secret[SECRET_LENGTH:] + return _hmac_sha256(key, doc_id) + + +def _get_aes_cipher(key, iv, tag, mode=modes.GCM): + mode = mode(iv, tag) if mode == modes.GCM else mode(iv) + return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) + + +def _split(base64_raw_payload): + return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload)) + + +def _mode_by_method(method): + if method == ENC_METHOD.aes_256_gcm: + return modes.GCM + else: + return modes.CTR diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index ce9bec05..a5328d2b 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -50,8 +50,7 @@ How many times a SQLCipher query should be retried in case of timeout. SQLCIPHER_MAX_RETRIES = 10 -def getConnectionPool(opts, openfun=None, driver="pysqlcipher", - sync_enc_pool=None): +def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): """ Return a connection pool. @@ -72,7 +71,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher", if openfun is None and driver == "pysqlcipher": openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( - opts, sync_enc_pool, + opts, # the following params are relayed "as is" to twisted's # ConnectionPool. "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, @@ -89,7 +88,7 @@ class U1DBConnection(adbapi.Connection): The U1DB wrapper to use. """ - def __init__(self, pool, sync_enc_pool, init_u1db=False): + def __init__(self, pool, init_u1db=False): """ :param pool: The pool of connections to that owns this connection. :type pool: adbapi.ConnectionPool @@ -97,7 +96,6 @@ class U1DBConnection(adbapi.Connection): :type init_u1db: bool """ self.init_u1db = init_u1db - self._sync_enc_pool = sync_enc_pool try: adbapi.Connection.__init__(self, pool) except dbapi2.DatabaseError as e: @@ -116,8 +114,7 @@ class U1DBConnection(adbapi.Connection): if self.init_u1db: self._u1db = self.u1db_wrapper( self._connection, - self._pool.opts, - self._sync_enc_pool) + self._pool.opts) def __getattr__(self, name): """ @@ -162,12 +159,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool): connectionFactory = U1DBConnection transactionFactory = U1DBTransaction - def __init__(self, opts, sync_enc_pool, *args, **kwargs): + def __init__(self, opts, *args, **kwargs): """ Initialize the connection pool. """ self.opts = opts - self._sync_enc_pool = sync_enc_pool try: adbapi.ConnectionPool.__init__(self, *args, **kwargs) except dbapi2.DatabaseError as e: @@ -182,7 +178,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): try: conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=True) + self, init_u1db=True) replica_uid = conn._u1db._real_replica_uid setProxiedObject(self.replica_uid, replica_uid) except DatabaseAccessError as e: @@ -257,7 +253,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): tid = self.threadID() u1db = self._u1dbconnections.get(tid) conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=not bool(u1db)) + self, init_u1db=not bool(u1db)) if self.replica_uid is None: replica_uid = conn._u1db._real_replica_uid diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..da6eec66 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -56,11 +56,10 @@ from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events from leap.soledad.client import interfaces as soledad_interfaces -from leap.soledad.client.crypto import SoledadCrypto +from leap.soledad.client import sqlcipher from leap.soledad.client.secrets import SoledadSecrets from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.client import sqlcipher -from leap.soledad.client import encdecpool +from leap.soledad.client._crypto import SoledadCrypto logger = getLogger(__name__) @@ -131,7 +130,7 @@ class Soledad(object): def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, shared_db=None, - auth_token=None, defer_encryption=False, syncable=True): + auth_token=None, syncable=True): """ Initialize configuration, cryptographic keys and dbs. @@ -168,11 +167,6 @@ class Soledad(object): Authorization token for accessing remote databases. :type auth_token: str - :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it - inline while syncing. - :type defer_encryption: bool - :param syncable: If set to ``False``, this database will not attempt to synchronize with remote replicas (default is ``True``) @@ -188,9 +182,7 @@ class Soledad(object): self._passphrase = passphrase self._local_db_path = local_db_path self._server_url = server_url - self._defer_encryption = defer_encryption self._secrets_path = None - self._sync_enc_pool = None self._dbsyncer = None self.shared_db = shared_db @@ -226,7 +218,6 @@ class Soledad(object): # have to close any thread-related stuff we have already opened # here, otherwise there might be zombie threads that may clog the # reactor. - self._sync_db.close() if hasattr(self, '_dbpool'): self._dbpool.close() raise @@ -289,22 +280,12 @@ class Soledad(object): tohex = binascii.b2a_hex # sqlcipher only accepts the hex version key = tohex(self._secrets.get_local_storage_key()) - sync_db_key = tohex(self._secrets.get_sync_db_key()) opts = sqlcipher.SQLCipherOptions( self._local_db_path, key, - is_raw_key=True, create=True, - defer_encryption=self._defer_encryption, - sync_db_key=sync_db_key, - ) + is_raw_key=True, create=True) self._sqlcipher_opts = opts - - # the sync_db is used both for deferred encryption and decryption, so - # we want to initialize it anyway to allow for all combinations of - # deferred encryption and decryption configurations. - self._initialize_sync_db(opts) - self._dbpool = adbapi.getConnectionPool( - opts, sync_enc_pool=self._sync_enc_pool) + self._dbpool = adbapi.getConnectionPool(opts) def _init_u1db_syncer(self): """ @@ -313,10 +294,7 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = sqlcipher.SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - SOLEDAD_CERT, - defer_encryption=self._defer_encryption, - sync_db=self._sync_db, - sync_enc_pool=self._sync_enc_pool) + SOLEDAD_CERT) def sync_stats(self): sync_phase = 0 @@ -341,12 +319,6 @@ class Soledad(object): self._dbpool.close() if getattr(self, '_dbsyncer', None): self._dbsyncer.close() - # close the sync database - if self._sync_db: - self._sync_db.close() - self._sync_db = None - if self._defer_encryption: - self._sync_enc_pool.stop() # # ILocalStorage @@ -385,7 +357,8 @@ class Soledad(object): also be updated. :rtype: twisted.internet.defer.Deferred """ - return self._defer("put_doc", doc) + d = self._defer("put_doc", doc) + return d def delete_doc(self, doc): """ @@ -479,7 +452,8 @@ class Soledad(object): # create_doc (and probably to put_doc too). There are cases (mail # payloads for example) in which we already have the encoding in the # headers, so we don't need to guess it. - return self._defer("create_doc", content, doc_id=doc_id) + d = self._defer("create_doc", content, doc_id=doc_id) + return d def create_doc_from_json(self, json, doc_id=None): """ @@ -700,37 +674,26 @@ class Soledad(object): if syncable and not self._dbsyncer: self._init_u1db_syncer() - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents with the server replica. This method uses a lock to prevent multiple concurrent sync processes over the same local db file. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred lock that will run the actual sync process when the lock is acquired, and which will fire with with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ d = self.sync_lock.run( - self._sync, - defer_decryption) + self._sync) return d - def _sync(self, defer_decryption): + def _sync(self): """ Synchronize documents with the server replica. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -740,8 +703,7 @@ class Soledad(object): return d = self._dbsyncer.sync( sync_url, - creds=self._creds, - defer_decryption=defer_decryption) + creds=self._creds) def _sync_callback(local_gen): self._last_received_docs = docs = self._dbsyncer.received_docs @@ -837,50 +799,6 @@ class Soledad(object): token = property(_get_token, _set_token, doc='The authentication Token.') - def _initialize_sync_db(self, opts): - """ - Initialize the Symmetrically-Encrypted document to be synced database, - and the queue to communicate with subprocess workers. - - :param opts: - :type opts: SQLCipherOptions - """ - soledad_assert(opts.sync_db_key is not None) - sync_db_path = None - if opts.path != ":memory:": - sync_db_path = "%s-sync" % opts.path - else: - sync_db_path = ":memory:" - - # we copy incoming options because the opts object might be used - # somewhere else - sync_opts = sqlcipher.SQLCipherOptions.copy( - opts, path=sync_db_path, create=True) - self._sync_db = sqlcipher.getConnectionPool( - sync_opts, extra_queries=self._sync_db_extra_init) - if self._defer_encryption: - # initialize syncing queue encryption pool - self._sync_enc_pool = encdecpool.SyncEncrypterPool( - self._crypto, self._sync_db) - self._sync_enc_pool.start() - - @property - def _sync_db_extra_init(self): - """ - Queries for creating tables for the local sync documents db if needed. - They are passed as extra initialization to initialize_sqlciphjer_db - - :rtype: tuple of strings - """ - maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = encdecpool.SyncEncrypterPool - decr = encdecpool.SyncDecrypterPool - sql_encr_table_query = (maybe_create % ( - encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr_table_query = (maybe_create % ( - decr.TABLE_NAME, decr.FIELD_NAMES)) - return (sql_encr_table_query, sql_decr_table_query) - # # ISecretsStorage # @@ -1017,6 +935,7 @@ def create_path_if_not_exists(path): # Monkey patching u1db to be able to provide a custom SSL cert # ---------------------------------------------------------------------------- + # We need a more reasonable timeout (in seconds) SOLEDAD_TIMEOUT = 120 diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index d81c883b..09e90171 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -32,9 +32,12 @@ from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto from leap.soledad.common.log import getLogger +import warnings logger = getLogger(__name__) +warnings.warn("'soledad.client.crypto' MODULE DEPRECATED", + DeprecationWarning, stacklevel=2) MAC_KEY_LENGTH = 64 diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py deleted file mode 100644 index 056b012f..00000000 --- a/client/src/leap/soledad/client/encdecpool.py +++ /dev/null @@ -1,657 +0,0 @@ -# -*- coding: utf-8 -*- -# encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A pool of encryption/decryption concurrent and parallel workers for using -during synchronization. -""" - - -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall -from twisted.internet import threads -from twisted.internet import defer - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common import soledad_assert -from leap.soledad.common.log import getLogger - -from leap.soledad.client.crypto import encrypt_docstr -from leap.soledad.client.crypto import decrypt_doc_dict - - -logger = getLogger(__name__) - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - - def __init__(self, crypto, sync_db): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - """ - self._crypto = crypto - self._sync_db = sync_db - self._delayed_call = None - self._started = False - - def start(self): - self._started = True - - def stop(self): - self._started = False - # maybe cancel the next delayed call - if self._delayed_call \ - and not self._delayed_call.called: - self._delayed_call.cancel() - - @property - def running(self): - return self._started - - def _runOperation(self, query, *args): - """ - Run an operation on the sync db. - - :param query: The query to be executed. - :type query: str - :param args: A list of query arguments. - :type args: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - return self._sync_db.runOperation(query, *args) - - def _runQuery(self, query, *args): - """ - Run a query on the sync db. - - :param query: The query to be executed. - :type query: str - :param args: A list of query arguments. - :type args: list - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - return self._sync_db.runQuery(query, *args) - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - - ENCRYPT_LOOP_PERIOD = 2 - - def __init__(self, *args, **kwargs): - """ - Initialize the sync encrypter pool. - """ - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - # TODO delete already synced files from database - - def start(self): - """ - Start the encrypter pool. - """ - SyncEncryptDecryptPool.start(self) - logger.debug("starting the encryption loop...") - - def stop(self): - """ - Stop the encrypter pool. - """ - - SyncEncryptDecryptPool.stop(self) - - def encrypt_doc(self, doc): - """ - Encrypt document asynchronously then insert it on - local staging database. - - :param doc: The document to be encrypted. - :type doc: SoledadDocument - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - # encrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - encrypt_doc_task, *args) - d.addCallback(self._encrypt_doc_cb) - return d - - def _encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - - def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - """ - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ - % (self.TABLE_NAME,) - return self._runOperation(query, (doc_id, doc_rev, content)) - - @defer.inlineCallbacks - def get_encrypted_doc(self, doc_id, doc_rev): - """ - Get an encrypted document from the sync db. - - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - - :return: A deferred that will fire with the encrypted content of the - document or None if the document was not found in the sync - db. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ - % self.TABLE_NAME - result = yield self._runQuery(query, (doc_id, doc_rev)) - if result: - logger.debug("found doc on sync db: %s" % doc_id) - val = result.pop() - defer.returnValue(val[0]) - logger.debug("did not find doc on sync db: %s" % doc_id) - defer.returnValue(None) - - def delete_encrypted_doc(self, doc_id, doc_rev): - """ - Delete an encrypted document from the sync db. - - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ - % self.TABLE_NAME - self._runOperation(query, (doc_id, doc_rev)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, - idx): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document as JSON dict. - :type content: dict - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. Encrypted documents are stored in the sync db by the actual soledad - sync loop. - 2. The soledad sync loop tells us how many documents we should expect - to process. - 3. We start a decrypt-and-process loop: - - a. Encrypted documents are fetched. - b. Encrypted documents are decrypted. - c. The longest possible list of decrypted documents are inserted - in the soledad db (this depends on which documents have already - arrived and which documents have already been decrypte, because - the order of insertion in the local soledad db matters). - d. Processed documents are deleted from the database. - - 4. When we have processed as many documents as we should, the loop - finishes. - """ - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx, sync_id" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param source_replica_uid: The source replica uid, used to find the - correct callback for inserting documents. - :type source_replica_uid: str - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - - self._docs_to_process = None - self._processed_docs = 0 - self._last_inserted_idx = 0 - - self._loop = LoopingCall(self._decrypt_and_recurse) - - def _start_pool(self, period): - self._loop.start(period) - - def start(self, docs_to_process): - """ - Set the number of documents we expect to process. - - This should be called by the during the sync exchange process as soon - as we know how many documents are arriving from the server. - - :param docs_to_process: The number of documents to process. - :type docs_to_process: int - """ - SyncEncryptDecryptPool.start(self) - self._decrypted_docs_indexes = set() - self._sync_id = uuid4().hex - self._docs_to_process = docs_to_process - self._deferred = defer.Deferred() - d = self._init_db() - d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) - return d - - def stop(self): - if self._loop.running: - self._loop.stop() - self._finish() - SyncEncryptDecryptPool.stop(self) - - def _init_db(self): - """ - Ensure sync_id column is present then - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % - self.TABLE_NAME) - d = self._runQuery(ensure_sync_id_column) - - def empty_received_docs(_): - query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) - return self._runOperation(query, (self._sync_id,)) - - d.addCallbacks(empty_received_docs, empty_received_docs) - return d - - def _errback(self, failure): - logger.error(failure) - self._deferred.errback(failure) - self._processed_docs = 0 - self._last_inserted_idx = 0 - - @property - def deferred(self): - """ - Deferred that will be fired when the decryption loop has finished - processing all the documents. - """ - return self._deferred - - def insert_encrypted_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Decrypt and insert a received document into local staging area to be - processed later on. - - :param doc_id: The document ID. - :type doc_id: str - :param doc_rev: The document Revision - :param doc_rev: str - :param content: The content of the document - :type content: dict - :param gen: The document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire after the decrypted document has - been inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - decrypt_doc_task, *args) - # callback will insert it for later processing - d.addCallback(self._decrypt_doc_cb) - return d - - def insert_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The document id - :type doc_id: str - :param doc_rev: The document revision - :param doc_rev: str or dict - :param content: The content of the document - :type content: dict - :param gen: The document generation - :type gen: int - :param trans_id: The transaction id - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, - idx, self._sync_id)) - d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) - return d - - def _delete_received_docs(self, doc_ids): - """ - Delete a list of received docs after get them inserted into the db. - - :param doc_id: Document ID list. - :type doc_id: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - placeholders = ', '.join('?' for _ in doc_ids) - query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ - % (self.TABLE_NAME, placeholders) - return self._runOperation(query, (doc_ids)) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted_docs. - - :param result: A tuple containing the document's id, revision, - content, generation, transaction id and sync index. - :type result: tuple(str, str, str, int, str, int) - - :return: A deferred that will fire after the document has been - inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - doc_id, rev, content, gen, trans_id, idx = result - logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _get_docs(self, encrypted=None, sequence=None): - """ - Get documents from the received docs table in the sync db. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - :param order_by: The name of the field to order results. - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ - "idx FROM %s" % self.TABLE_NAME - parameters = [] - if encrypted or sequence: - query += " WHERE sync_id = ? and" - parameters += [self._sync_id] - if encrypted: - query += " encrypted = ?" - parameters += [int(encrypted)] - if sequence: - query += " idx in (" + ', '.join('?' * len(sequence)) + ")" - parameters += [int(i) for i in sequence] - query += " ORDER BY idx ASC" - return self._runQuery(query, parameters) - - @defer.inlineCallbacks - def _get_insertable_docs(self): - """ - Return a list of non-encrypted documents ready to be inserted. - - :return: A deferred that will fire with the list of insertable - documents. - :rtype: twisted.internet.defer.Deferred - """ - # Here, check in memory what are the insertable indexes that can - # form a sequence starting from the last inserted index - sequence = [] - insertable_docs = [] - next_index = self._last_inserted_idx + 1 - while next_index in self._decrypted_docs_indexes: - sequence.append(str(next_index)) - next_index += 1 - if len(sequence) > 900: - # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER - # if we try to query more, SQLite will refuse - # we need to find a way to improve this - # being researched in #7669 - break - # Then fetch all the ones ready for insertion. - if sequence: - insertable_docs = yield self._get_docs(encrypted=False, - sequence=sequence) - defer.returnValue(insertable_docs) - - @defer.inlineCallbacks - def _process_decrypted_docs(self): - """ - Fetch as many decrypted documents as can be taken from the expected - order and insert them in the local replica. - - :return: A deferred that will fire with the list of inserted - documents. - :rtype: twisted.internet.defer.Deferred - """ - insertable = yield self._get_insertable_docs() - processed_docs_ids = [] - for doc_fields in insertable: - method = self._insert_decrypted_local_doc - # FIXME: This is used only because SQLCipherU1DBSync is synchronous - # When adbapi is used there is no need for an external thread - # Without this the reactor can freeze and fail docs download - yield threads.deferToThread(method, *doc_fields) - processed_docs_ids.append(doc_fields[0]) - yield self._delete_received_docs(processed_docs_ids) - - def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, encrypted, idx): - """ - Insert the decrypted document into the local replica. - - Make use of the passed callback `insert_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - logger.debug("sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - self._insert_doc_cb(doc, gen, trans_id) - - # store info about processed docs - self._last_inserted_idx = idx - self._processed_docs += 1 - - @defer.inlineCallbacks - def _decrypt_and_recurse(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - This method implicitelly returns a defferred (see the decorator - above). It should only be called by _launch_decrypt_and_process(). - because this way any exceptions raised here will be stored by the - errback attached to the deferred returned. - - :return: A deferred which will fire after all decrypt, process and - delete operations have been executed. - :rtype: twisted.internet.defer.Deferred - """ - if not self.running: - defer.returnValue(None) - processed = self._processed_docs - pending = self._docs_to_process - - if processed < pending: - yield self._process_decrypted_docs() - else: - self._finish() - - def _finish(self): - self._processed_docs = 0 - self._last_inserted_idx = 0 - self._decrypted_docs_indexes = set() - if not self._deferred.called: - self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py index 4fc91d9d..92bc85d6 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -58,6 +58,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts) def createDoc(doc): return dbpool.runU1DBQuery("create_doc", doc) + db_indexes = { 'by-chash': ['chash'], 'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_): deferreds.append(d) return defer.gatherResults(deferreds, consumeErrors=True) + d = create_indexes(None) d.addCallback(insert_docs) d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py index 38ea18a3..429566c7 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -58,6 +58,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts) def createDoc(doc, doc_id): return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id) + db_indexes = { 'by-chash': ['chash'], 'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_): deferreds.append(d) return defer.gatherResults(deferreds, consumeErrors=True) + d = create_indexes(None) d.addCallback(insert_docs) d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py index 61621e89..ddedf433 100644 --- a/client/src/leap/soledad/client/examples/run_benchmark.py +++ b/client/src/leap/soledad/client/examples/run_benchmark.py @@ -14,6 +14,7 @@ cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py" def parse_time(r): return r.split('\n')[-1] + with open(CSVFILE, 'w') as log: for times in range(0, 10000, 500): diff --git a/client/src/leap/soledad/client/examples/soledad_sync.py b/client/src/leap/soledad/client/examples/soledad_sync.py index 63077ee3..3aed10eb 100644 --- a/client/src/leap/soledad/client/examples/soledad_sync.py +++ b/client/src/leap/soledad/client/examples/soledad_sync.py @@ -40,7 +40,7 @@ def init_soledad(_): global soledad soledad = Soledad(uuid, _pass, secrets_path, local_db_path, server_url, cert_file, - auth_token=token, defer_encryption=False) + auth_token=token) def getall(_): d = soledad.get_all_docs() diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py index a2683836..39301b41 100644 --- a/client/src/leap/soledad/client/examples/use_adbapi.py +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -39,6 +39,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] times", times) @@ -87,6 +88,7 @@ def allDone(_): print((end_time - start_time).total_seconds()) reactor.stop() + deferreds = [] payload = open('manifest.phk').read() diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py index e2501c98..db77c4b3 100644 --- a/client/src/leap/soledad/client/examples/use_api.py +++ b/client/src/leap/soledad/client/examples/use_api.py @@ -36,6 +36,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] times", times) @@ -52,6 +53,7 @@ db = sqlcipher.SQLCipherDatabase(opts) def allDone(): debug("ALL DONE!") + payload = open('manifest.phk').read() for i in range(times): diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 62e8bcf0..0e250bf1 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -25,10 +25,13 @@ after receiving. import os from leap.soledad.common.log import getLogger -from leap.common.http import HTTPClient +from leap.common.certs import get_compatible_ssl_context_factory +from twisted.web.client import Agent +from twisted.internet import reactor from leap.soledad.client.http_target.send import HTTPDocSender from leap.soledad.client.http_target.api import SyncTargetAPI from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto logger = getLogger(__name__) @@ -51,8 +54,7 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): the parsed documents that the remote send us, before being decrypted and written to the main database. """ - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): + def __init__(self, url, source_replica_uid, creds, crypto, cert_file): """ Initialize the sync target. @@ -65,21 +67,11 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): :type creds: creds :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto + :type crypto: soledad._crypto.SoledadCrypto :param cert_file: Path to the certificate of the ca used to validate the SSL certificate used by the remote soledad server. :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool """ if url.endswith("/"): url = url[:-1] @@ -89,17 +81,13 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): self._uuid = None self.set_creds(creds) self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool + # TODO: DEPRECATED CRYPTO + self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret) self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - - # XXX Increasing timeout of simple requests to avoid chances of hitting - # the duplicated syncing bug. This could be reduced to the 30s default - # after implementing Cancellable Sync. See #7382 - self._http = HTTPClient(cert_file, timeout=90) + + # Twisted default Agent with our own ssl context factory + self._http = Agent(reactor, + get_compatible_ssl_context_factory(cert_file)) if DO_STATS: self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..1b086a00 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -18,10 +18,13 @@ import os import json import base64 +from StringIO import StringIO from uuid import uuid4 from twisted.web.error import Error from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer from leap.soledad.client.http_target.support import readBody from leap.soledad.common.errors import InvalidAuthTokenError @@ -39,14 +42,6 @@ class SyncTargetAPI(SyncTarget): Declares public methods and implements u1db.SyncTarget. """ - @defer.inlineCallbacks - def close(self): - if self._sync_enc_pool: - self._sync_enc_pool.stop() - if self._sync_decr_pool: - self._sync_decr_pool.stop() - yield self._http.close() - @property def uuid(self): return self._uuid @@ -69,16 +64,20 @@ class SyncTargetAPI(SyncTarget): def _base_header(self): return self._auth_header.copy() if self._auth_header else {} - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None): + content_type=None, body_reader=readBody, + body_producer=None): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, readBody) + if not body_producer and body: + body = FileBodyProducer(StringIO(body)) + elif body_producer: + # Upload case, check send.py + body = body_producer(body) + d = self._http.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(body_reader) d.addErrback(_unauth_to_invalid_token_error) return d @@ -153,7 +152,7 @@ class SyncTargetAPI(SyncTarget): def sync_exchange(self, docs_by_generation, source_replica_uid, last_known_generation, last_known_trans_id, insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. After that, receive documents from the remote @@ -185,11 +184,6 @@ class SyncTargetAPI(SyncTarget): created. :type ensure_callback: function - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which fires with the new generation and transaction id of the target replica. :rtype: twisted.internet.defer.Deferred @@ -221,8 +215,7 @@ class SyncTargetAPI(SyncTarget): cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) + ensure_callback, sync_id) # update gen and trans id info in case we just sent and did not # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 184c5883..8676ceed 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -15,18 +15,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json - from twisted.internet import defer +from twisted.internet import threads from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted from leap.soledad.common.document import SoledadDocument from leap.soledad.common.l2db import errors -from leap.soledad.common.l2db.remote import utils +from leap.soledad.client import crypto as old_crypto + +from . import fetch_protocol logger = getLogger(__name__) @@ -50,208 +51,105 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - + ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - doc = yield self._receive_one_doc( + # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes + self.semaphore = defer.DeferredSemaphore(1) + + metadata = yield self._fetch_all( last_known_generation, last_known_trans_id, - sync_id, 0) - self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + + # wait for pending inserts + yield self.semaphore.acquire() if ngen: new_generation = ngen new_transaction_id = ntrans - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - defer.returnValue([new_generation, new_transaction_id]) - def _receive_one_doc(self, last_known_generation, - last_known_trans_id, sync_id, received): + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id): # add remote replica metadata to the request body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - body.insert_info(received=received) - # send headers + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream return self._http_request( self._url, method='POST', body=str(body), - content_type='application/x-soledad-sync-get') + content_type='application/x-soledad-sync-get', + body_reader=body_reader) - def _insert_received_doc(self, response, idx, total): + @defer.inlineCallbacks + def _doc_parser(self, doc_info, content, total): """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str :param total: The total number of operations. :type total: int """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) - if self._sync_decr_pool and not self._sync_decr_pool.running: - self._sync_decr_pool.start(number_of_changes) - - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): + doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(content): + content = yield self._crypto.decrypt_doc(doc) + elif old_crypto.is_symmetrically_encrypted(doc): + content = self._deprecated_crypto.decrypt_doc(doc) + doc.set_json(content) + + # TODO insert blobs here on the blob backend + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} - _emit_receive_status(user_data, self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id + _emit_receive_status(user_data, self._received_docs, total=total) - def _parse_received_doc_response(self, response): + def _parse_metadata(self, metadata): """ - Parse the response from the server containing the received document. + Parse the response from the server containing the sync metadata. - :param response: The body and headers of the response. - :type response: tuple(str, dict) + :param response: Metadata as string + :type response: str - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) + :return: (number_of_changes, new_gen, new_trans_id) :rtype: tuple """ - # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] + metadata = json.loads(metadata) + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) except (ValueError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) + raise errors.BrokenSyncStream('Metadata parsing failed') def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..fa6b1969 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from functools import partial +from cStringIO import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self.metadata = '' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + try: + if reason.check(ResponseDone): + self.dataBuffer = self.metadata + else: + self.dataBuffer = self.finish() + except errors.BrokenSyncStream, e: + return self.deferred.errback(e) + return ReadBodyProtocol.connectionLost(self, reason) + + def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + self.lineReceived(line) + self._line += 1 + + def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ + if self._properly_finished: + raise errors.BrokenSyncStream("Reading a finished stream") + if ']' == line: + self._properly_finished = True + elif self._line == 0: + if line is not '[': + raise errors.BrokenSyncStream("Invalid start") + elif self._line == 1: + self.metadata = line + if 'error' in self.metadata: + raise errors.BrokenSyncStream("Error from server: %s" % line) + self.total = json.loads(line).get('number_of_changes', -1) + elif (self._line % 2) == 0: + self.current_doc = json.loads(line) + if 'error' in self.current_doc: + raise errors.BrokenSyncStream("Error from server: %s" % line) + else: + d = self._doc_reader( + self.current_doc, line.strip() or None, self.total) + d.addErrback(self._error) + + def _error(self, reason): + logger.error(reason) + self.transport.loseConnection() + + def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ + if not self._properly_finished: + raise errors.BrokenSyncStream('Stream not properly closed') + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type doc_reader: function + + @return: A function that can be called by the http Agent to create and + configure the proper protocol. + """ + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..2b286ec5 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer logger = getLogger(__name__) @@ -32,8 +33,6 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ - MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet - # The uuid of the local replica. # Any class inheriting from this one should provide a meaningful attribute # if the sync status event is meant to be used somewhere else. @@ -54,73 +53,50 @@ class HTTPDocSender(object): last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - while body.consumed < total: - result = yield self._send_batch(total, body, docs_by_generation) + result = yield self._send_batch(body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, docs): - for doc, gen, trans_id in docs: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - @defer.inlineCallbacks - def _send_batch(self, total, body, docs): - sent = [] - missing = total - body.consumed - for i in xrange(1, missing + 1): - if body.pending_size > self.MAX_BATCH_SIZE: - break - idx = body.consumed + i - entry = docs[idx - 1] - sent.append(entry) - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._send_request(body.pop()) - if self._defer_encryption: - self._delete_sent(sent) - + def _send_batch(self, body, docs): + total, calls = len(docs), [] + for i, entry in enumerate(docs): + calls.append((self._prepare_one_doc, + entry, body, i + 1, total)) + result = yield self._send_request(body, calls) _emit_send_status(self.uuid, body.consumed, total) + defer.returnValue(result) - def _send_request(self, body): + def _send_request(self, body, calls): return self._http_request( self._url, method='POST', - body=body, - content_type='application/x-soledad-sync-put') + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): - doc, gen, trans_id = entry - content = yield self._encrypt_doc(doc) + get_doc_call, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc_call) body.insert_info( id=doc.doc_id, rev=doc.rev, content=content, gen=gen, trans_id=trans_id, number_of_docs=total, doc_idx=idx) + _emit_send_status(self.uuid, body.consumed, total) - def _encrypt_doc(self, doc): - d = None + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc_call): + f, args, kwargs = get_doc_call + doc = yield f(*args, **kwargs) if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) + defer.returnValue((doc, None)) else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d + content = yield self._crypto.encrypt_doc(doc) + defer.returnValue((doc, content)) def _emit_send_status(user_data, idx, total): diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..0cb6d039 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, producer): + """ + Initialize the string produer. + + :param producer: A RequestBody instance and a list of producer calls + :type producer: (.support.RequestBody, [(function, *args)]) + """ + self.body, self.producer = producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.001) + continue + call = self.producer.pop(0) + fun, args = call[0], call[1:] + yield fun(*args) + consumer.write(self.body.pop(1, leave_open=True)) + consumer.write(self.body.pop(0)) # close stream + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 6ec98ed4..d8d8e420 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -53,6 +53,9 @@ class ReadBodyProtocol(_ReadBodyProtocol): if exc_cls is not None: message = respdic.get("message") self.deferred.errback(exc_cls(message)) + else: + self.deferred.errback( + errors.HTTPError(self.status, respdic, self.headers)) # ---8<--- end of snippet from u1db.remote.http_client def connectionLost(self, reason): @@ -91,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol): self.deferred.errback(reason) -def readBody(response): +def readBody(response, protocolClass=ReadBodyProtocol): """ Get the body of an L{IResponse} and return it as a byte string. @@ -116,7 +119,7 @@ def readBody(response): abort() d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) + protocol = protocolClass(response, d) def getAbort(): return getattr(protocol.transport, 'abortConnection', None) @@ -155,46 +158,49 @@ class RequestBody(object): self.headers = header_dict self.entries = [] self.consumed = 0 - self.pending_size = 0 def insert_info(self, **entry_dict): """ Dumps an entry into JSON format and add it to entries list. + Adds 'content' key on a new line if it's present. :param entry_dict: Entry as a dictionary :type entry_dict: dict - - :return: length of the entry after JSON dumps - :rtype: int """ - entry = json.dumps(entry_dict) + content = '' + if 'content' in entry_dict: + content = ',\r\n' + (entry_dict['content'] or '') + entry = json.dumps(entry_dict) + content self.entries.append(entry) - self.pending_size += len(entry) - def pop(self): + def pop(self, amount=10, leave_open=False): """ - Removes all entries and returns it formatted and ready + Removes entries and returns it formatted and ready to be sent. - :param number: number of entries to pop and format - :type number: int + :param amount: number of entries to pop and format + :type amount: int + + :param leave_open: flag to skip stream closing + :type amount: bool :return: formatted body ready to be sent :rtype: str """ - entries = self.entries[:] - self.entries = [] - self.pending_size = 0 - self.consumed += len(entries) - return self.entries_to_str(entries) + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 if not leave_open else False + return self.entries_to_str(entries, start, end) def __str__(self): - return self.entries_to_str(self.entries) + return self.pop(len(self.entries)) def __len__(self): return len(self.entries) - def entries_to_str(self, entries=None): + def entries_to_str(self, entries=None, start=True, end=True): """ Format a list of entries into the body format expected by the server. @@ -205,6 +211,10 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - data = '[\r\n' + json.dumps(self.headers) + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) data += ''.join(',\r\n' + entry for entry in entries) - return data + '\r\n]' + if end: + data += '\r\n]' + return data diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface): "Property, True if the syncer is syncing.") token = Attribute("The authentication Token.") - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface): :param url: the url of the target replica to sync with :type url: str - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool - :return: A deferred that will fire with the local generation before the synchronisation was performed. diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 1eb6f31d..3fe98c64 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type from leap.soledad.common import document from leap.soledad.common.log import getLogger from leap.soledad.client import events -from leap.soledad.client.crypto import encrypt_sym, decrypt_sym +from leap.soledad.client import _crypto logger = getLogger(__name__) @@ -126,7 +126,7 @@ class SoledadSecrets(object): instantiates Soledad. """ - IV_SEPARATOR = ":" + SEPARATOR = ":" """ A separator used for storing the encryption initial value prepended to the ciphertext. @@ -142,7 +142,8 @@ class SoledadSecrets(object): KDF_SALT_KEY = 'kdf_salt' KDF_LENGTH_KEY = 'kdf_length' KDF_SCRYPT = 'scrypt' - CIPHER_AES256 = 'aes256' + CIPHER_AES256 = 'aes256' # deprecated, AES-CTR + CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm RECOVERY_DOC_VERSION_KEY = 'version' RECOVERY_DOC_VERSION = 1 """ @@ -343,7 +344,7 @@ class SoledadSecrets(object): '%s%s' % (self._passphrase_as_string(), self._uuid)).hexdigest() - def _export_recovery_document(self): + def _export_recovery_document(self, cipher=None): """ Export the storage secrets. @@ -364,6 +365,9 @@ class SoledadSecrets(object): Note that multiple storage secrets might be stored in one recovery document. + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str + :return: The recovery document. :rtype: dict """ @@ -371,7 +375,7 @@ class SoledadSecrets(object): encrypted_secrets = {} for secret_id in self._secrets: encrypted_secrets[secret_id] = self._encrypt_storage_secret( - self._secrets[secret_id]) + self._secrets[secret_id], doc_cipher=cipher) # create the recovery document data = { self.STORAGE_SECRETS_KEY: encrypted_secrets, @@ -447,6 +451,7 @@ class SoledadSecrets(object): except SecretsException as e: logger.error("failed to decrypt storage secret: %s" % str(e)) + raise e return secret_count, active_secret def _get_secrets_from_shared_db(self): @@ -537,18 +542,25 @@ class SoledadSecrets(object): ) if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key): raise SecretsException("Wrong length of decryption key.") - if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256: + supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM] + doc_cipher = encrypted_secret_dict[self.CIPHER_KEY] + if doc_cipher not in supported_ciphers: raise SecretsException("Unknown cipher in stored secret.") # recover the initial value and ciphertext iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split( - self.IV_SEPARATOR, 1) + self.SEPARATOR, 1) ciphertext = binascii.a2b_base64(ciphertext) - decrypted_secret = decrypt_sym(ciphertext, key, iv) + try: + decrypted_secret = _crypto.decrypt_sym( + ciphertext, key, iv, doc_cipher) + except Exception as e: + logger.error(e) + raise SecretsException("Unable to decrypt secret.") if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret): raise SecretsException("Wrong length of decrypted secret.") return decrypted_secret - def _encrypt_storage_secret(self, decrypted_secret): + def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None): """ Encrypt the storage secret. @@ -567,6 +579,8 @@ class SoledadSecrets(object): :param decrypted_secret: The decrypted storage secret. :type decrypted_secret: str + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str :return: The encrypted storage secret. :rtype: dict @@ -575,17 +589,18 @@ class SoledadSecrets(object): salt = os.urandom(self.SALT_LENGTH) # get a 256-bit key key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32) - iv, ciphertext = encrypt_sym(decrypted_secret, key) + doc_cipher = doc_cipher or self.CIPHER_AES256_GCM + iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher) + ciphertext = binascii.b2a_base64(ciphertext) encrypted_secret_dict = { # leap.soledad.crypto submodule uses AES256 for symmetric # encryption. self.KDF_KEY: self.KDF_SCRYPT, self.KDF_SALT_KEY: binascii.b2a_base64(salt), self.KDF_LENGTH_KEY: len(key), - self.CIPHER_KEY: self.CIPHER_AES256, + self.CIPHER_KEY: doc_cipher, self.LENGTH_KEY: len(decrypted_secret), - self.SECRET_KEY: '%s%s%s' % ( - str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)), + self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext]) } return encrypted_secret_dict diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 3921c323..c9a9444e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,9 +42,7 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import os -import json -from hashlib import sha256 from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -117,7 +115,7 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, is_raw_key=None, cipher=None, kdf_iter=None, - cipher_page_size=None, defer_encryption=None, sync_db_key=None): + cipher_page_size=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. @@ -134,7 +132,7 @@ class SQLCipherOptions(object): args.append(getattr(source, name)) for name in ["create", "is_raw_key", "cipher", "kdf_iter", - "cipher_page_size", "defer_encryption", "sync_db_key"]: + "cipher_page_size", "sync_db_key"]: val = local_vars[name] if val is not None: kwargs[name] = val @@ -145,7 +143,7 @@ class SQLCipherOptions(object): def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, - defer_encryption=False, sync_db_key=None): + sync_db_key=None): """ :param path: The filesystem path for the database to open. :type path: str @@ -163,10 +161,6 @@ class SQLCipherOptions(object): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int - :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it - inline while syncing. - :type defer_encryption: bool """ self.path = path self.key = key @@ -175,7 +169,6 @@ class SQLCipherOptions(object): self.cipher = cipher self.kdf_iter = kdf_iter self.cipher_page_size = cipher_page_size - self.defer_encryption = defer_encryption self.sync_db_key = sync_db_key def __str__(self): @@ -201,7 +194,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ A U1DB implementation that uses SQLCipher as its persistence layer. """ - defer_encryption = False # The attribute _index_storage_value will be used as the lookup key for the # implementation of the SQLCipher storage backend. @@ -225,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ # ensure the db is encrypted if the file already exists if os.path.isfile(opts.path): - _assert_db_is_encrypted(opts) - - # connect to the sqlcipher database - self._db_handle = initialize_sqlcipher_db(opts) + self._db_handle = _assert_db_is_encrypted(opts) + else: + # connect to the sqlcipher database + self._db_handle = initialize_sqlcipher_db(opts) # TODO --------------------------------------------------- # Everything else in this initialization has to be factored @@ -267,27 +259,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') # - # Document operations - # - - def put_doc(self, doc): - """ - Overwrite the put_doc method, to enqueue the modified document for - encryption before sync. - - :param doc: The document to be put. - :type doc: u1db.Document - - :return: The new document revision. - :rtype: str - """ - doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - if self.defer_encryption: - # TODO move to api? - self._sync_enc_pool.encrypt_doc(doc) - return doc_rev - - # # SQLCipher API methods # @@ -425,25 +396,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ ENCRYPT_LOOP_PERIOD = 1 - def __init__(self, opts, soledad_crypto, replica_uid, cert_file, - defer_encryption=False, sync_db=None, sync_enc_pool=None): + def __init__(self, opts, soledad_crypto, replica_uid, cert_file): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid self._cert_file = cert_file - self._sync_enc_pool = sync_enc_pool - - self._sync_db = sync_db - # we store syncers in a dictionary indexed by the target URL. We also - # store a hash of the auth info in case auth info expires and we need - # to rebuild the syncer for that target. The final self._syncers - # format is the following: - # - # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} - self._syncers = {} # storage for the documents received during a sync self.received_docs = [] @@ -458,6 +418,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if DO_STATS: self.sync_phase = None + def commit(self): + self._db_handle.commit() + @property def _replica_uid(self): return str(self.__replica_uid) @@ -477,7 +440,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): raise DatabaseAccessError(str(e)) @defer.inlineCallbacks - def sync(self, url, creds=None, defer_decryption=True): + def sync(self, url, creds=None): """ Synchronize documents with remote replica exposed at url. @@ -492,10 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param creds: optional dictionary giving credentials to authorize the operation with the server. :type creds: dict - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool :return: A Deferred, that will fire with the local generation (type `int`) @@ -507,8 +466,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.sync_phase = syncer.sync_phase self.syncer = syncer self.sync_exchange_phase = syncer.sync_exchange_phase - local_gen_before_sync = yield syncer.sync( - defer_decryption=defer_decryption) + local_gen_before_sync = yield syncer.sync() self.received_docs = syncer.received_docs defer.returnValue(local_gen_before_sync) @@ -525,29 +483,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :return: A synchronizer. :rtype: Synchronizer """ - # we want to store at most one syncer for each url, so we also store a - # hash of the connection credentials and replace the stored syncer for - # a certain url if credentials have changed. - h = sha256(json.dumps([url, creds])).hexdigest() - cur_h, syncer = self._syncers.get(url, (None, None)) - if syncer is None or h != cur_h: - syncer = SoledadSynchronizer( - self, - SoledadHTTPSyncTarget( - url, - # XXX is the replica_uid ready? - self._replica_uid, - creds=creds, - crypto=self._crypto, - cert_file=self._cert_file, - sync_db=self._sync_db, - sync_enc_pool=self._sync_enc_pool)) - self._syncers[url] = (h, syncer) - # in order to reuse the same synchronizer multiple times we have to - # reset its state (i.e. the number of documents received from target - # and inserted in the local replica). - syncer.num_inserted = 0 - return syncer + return SoledadSynchronizer( + self, + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + cert_file=self._cert_file)) # # Symmetric encryption of syncing docs @@ -558,18 +502,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # XXX this SHOULD BE a callback return self._get_generation() - def close(self): - """ - Close the syncer and syncdb orderly - """ - super(SQLCipherU1DBSync, self).close() - # close all open syncers - for url in self._syncers.keys(): - _, syncer = self._syncers[url] - syncer.close() - del self._syncers[url] - self.running = False - class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): """ @@ -599,14 +531,12 @@ class SoledadSQLCipherWrapper(SQLCipherDatabase): It can be used from adbapi to initialize a soledad database after getting a regular connection to a sqlcipher database. """ - def __init__(self, conn, opts, sync_enc_pool): + def __init__(self, conn, opts): self._db_handle = conn self._real_replica_uid = None self._ensure_schema() self.set_document_factory(soledad_doc_factory) self._prime_replica_uid() - self.defer_encryption = opts.defer_encryption - self._sync_enc_pool = sync_enc_pool def _assert_db_is_encrypted(opts): @@ -635,7 +565,7 @@ def _assert_db_is_encrypted(opts): # assert that we can access it using SQLCipher with the given # key dummy_query = ('SELECT count(*) FROM sqlite_master',) - initialize_sqlcipher_db(opts, on_init=dummy_query) + return initialize_sqlcipher_db(opts, on_init=dummy_query) else: raise DatabaseIsNotEncrypted() @@ -660,6 +590,7 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, return SoledadDocument(doc_id=doc_id, rev=rev, json=json, has_conflicts=has_conflicts, syncable=syncable) + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 7ed5f693..70c841d6 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer): self.sync_exchange_phase = None @defer.inlineCallbacks - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents between source and target. - Differently from u1db `Synchronizer.sync` method, this one allows to - pass a `defer_decryption` flag that will postpone the last - step in the synchronization dance, namely, the setting of the last - known generation and transaction id for a given remote replica. - - This is done to allow the ongoing parallel decryption of the incoming - docs to proceed without `InvalidGeneration` conflicts. - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which will fire after the sync has finished with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -154,28 +141,19 @@ class SoledadSynchronizer(Synchronizer): self.sync_phase[0] += 1 # -------------------------------------------------------------------- - # prepare to send all the changed docs - changed_doc_ids = [doc_id for doc_id, _, _ in changes] - docs_to_send = self.source.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) - ids_sent = [] - docs_by_generation = [] - idx = 0 - for doc in docs_to_send: - _, gen, trans = changes[idx] - docs_by_generation.append((doc, gen, trans)) - idx += 1 - ids_sent.append(doc.doc_id) + docs_by_generation = self._docs_by_gen_from_changes(changes) # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. new_gen, new_trans_id = yield sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) + self._insert_doc_from_target, ensure_callback=ensure_callback) + ids_sent = [doc_id for doc_id, _, _ in changes] logger.debug("target gen after sync: %d" % new_gen) logger.debug("target trans_id after sync: %s" % new_trans_id) + if hasattr(self.source, 'commit'): # sqlcipher backend speed up + self.source.commit() # insert it all in a single transaction info = { "target_replica_uid": self.target_replica_uid, "new_gen": new_gen, @@ -204,6 +182,14 @@ class SoledadSynchronizer(Synchronizer): defer.returnValue(my_gen) + def _docs_by_gen_from_changes(self, changes): + docs_by_generation = [] + kwargs = {'include_deleted': True} + for doc_id, gen, trans in changes: + get_doc = (self.source.get_doc, (doc_id,), kwargs) + docs_by_generation.append((get_doc, gen, trans)) + return docs_by_generation + def complete_sync(self): """ Last stage of the synchronization: @@ -225,12 +211,6 @@ class SoledadSynchronizer(Synchronizer): # if gapless record current reached generation with target return self._record_sync_info_with_the_target(info["my_gen"]) - def close(self): - """ - Close the synchronizer. - """ - self.sync_target.close() - def _record_sync_info_with_the_target(self, start_generation): """ Store local replica metadata in server. diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index f4f48f86..4a29ca87 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -73,8 +73,8 @@ class SoledadBackend(CommonBackend): def batch_end(self): if not self.BATCH_SUPPORT: return - self.batching = False self._database.batch_end() + self.batching = False for name in self.after_batch_callbacks: self.after_batch_callbacks[name]() self.after_batch_callbacks = None @@ -570,7 +570,7 @@ class SoledadBackend(CommonBackend): self._put_doc(cur_doc, doc) def get_docs(self, doc_ids, check_for_conflicts=True, - include_deleted=False): + include_deleted=False, read_content=True): """ Get the JSON content for many documents. @@ -588,7 +588,7 @@ class SoledadBackend(CommonBackend): :rtype: iterable """ return self._database.get_docs(doc_ids, check_for_conflicts, - include_deleted) + include_deleted, read_content) def _prune_conflicts(self, doc, doc_vcr): """ diff --git a/common/src/leap/soledad/common/couch/__init__.py b/common/src/leap/soledad/common/couch/__init__.py index 0f4102db..2e6f734e 100644 --- a/common/src/leap/soledad/common/couch/__init__.py +++ b/common/src/leap/soledad/common/couch/__init__.py @@ -20,6 +20,7 @@ import json +import copy import re import uuid import binascii @@ -295,31 +296,14 @@ class CouchDatabase(object): generation, _ = self.get_generation_info() results = list( - self._get_docs(None, True, include_deleted)) + self.get_docs(None, True, include_deleted)) return (generation, results) def get_docs(self, doc_ids, check_for_conflicts=True, - include_deleted=False): + include_deleted=False, read_content=True): """ Get the JSON content for many documents. - :param doc_ids: A list of document identifiers or None for all. - :type doc_ids: list - :param check_for_conflicts: If set to False, then the conflict check - will be skipped, and 'None' will be - returned instead of True/False. - :type check_for_conflicts: bool - :param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted - documents will not be included in the results. - :return: iterable giving the Document object for each document id - in matching doc_ids order. - :rtype: iterable - """ - return self._get_docs(doc_ids, check_for_conflicts, include_deleted) - - def _get_docs(self, doc_ids, check_for_conflicts, include_deleted): - """ Use couch's `_all_docs` view to get the documents indicated in `doc_ids`, @@ -337,14 +321,21 @@ class CouchDatabase(object): in matching doc_ids order. :rtype: iterable """ - params = {'include_docs': 'true', 'attachments': 'true'} + params = {'include_docs': 'true', 'attachments': 'false'} if doc_ids is not None: params['keys'] = doc_ids view = self._database.view("_all_docs", **params) for row in view.rows: - result = row['doc'] + result = copy.deepcopy(row['doc']) + for file_name in result.get('_attachments', {}).keys(): + data = self._database.get_attachment(result, file_name) + if data: + if read_content: + data = data.read() + result['_attachments'][file_name] = {'data': data} doc = self.__parse_doc_from_couch( - result, result['_id'], check_for_conflicts=check_for_conflicts) + result, result['_id'], + check_for_conflicts=check_for_conflicts, decode=False) # filter out non-u1db or deleted documents if not doc or (not include_deleted and doc.is_tombstone()): continue @@ -409,7 +400,7 @@ class CouchDatabase(object): return rev def __parse_doc_from_couch(self, result, doc_id, - check_for_conflicts=False): + check_for_conflicts=False, decode=True): # restrict to u1db documents if 'u1db_rev' not in result: return None @@ -418,19 +409,23 @@ class CouchDatabase(object): if '_attachments' not in result \ or 'u1db_content' not in result['_attachments']: doc.make_tombstone() - else: + elif decode: doc.content = json.loads( binascii.a2b_base64( result['_attachments']['u1db_content']['data'])) + else: + doc._json = result['_attachments']['u1db_content']['data'] # determine if there are conflicts if check_for_conflicts \ and '_attachments' in result \ and 'u1db_conflicts' in result['_attachments']: - doc.set_conflicts( - self._build_conflicts( - doc.doc_id, - json.loads(binascii.a2b_base64( - result['_attachments']['u1db_conflicts']['data'])))) + if decode: + conflicts = binascii.a2b_base64( + result['_attachments']['u1db_conflicts']['data']) + else: + conflicts = result['_attachments']['u1db_conflicts']['data'] + conflicts = json.loads(conflicts) + doc.set_conflicts(self._build_conflicts(doc.doc_id, conflicts)) # store couch revision doc.couch_rev = result['_rev'] return doc @@ -663,7 +658,7 @@ class CouchDatabase(object): _, _, data = resource.get_json(**kwargs) return data - def _allocate_new_generation(self, doc_id, transaction_id): + def _allocate_new_generation(self, doc_id, transaction_id, save=True): """ Allocate a new generation number for a document modification. @@ -703,10 +698,12 @@ class CouchDatabase(object): DOC_ID_KEY: doc_id, TRANSACTION_ID_KEY: transaction_id, } - self._database.save(gen_doc) + if save: + self._database.save(gen_doc) break # succeeded allocating a new generation, proceed except ResourceConflict: pass # try again! + return gen_doc def save_document(self, old_doc, doc, transaction_id): """ @@ -785,6 +782,7 @@ class CouchDatabase(object): headers=envelope.headers) except ResourceConflict: raise RevisionConflict() + self._allocate_new_generation(doc.doc_id, transaction_id) else: for name, attachment in attachments.items(): del attachment['follows'] @@ -793,12 +791,13 @@ class CouchDatabase(object): attachment['data'] = binascii.b2a_base64( parts[index]).strip() couch_doc['_attachments'] = attachments + gen_doc = self._allocate_new_generation( + doc.doc_id, transaction_id, save=False) self.batch_docs[doc.doc_id] = couch_doc + self.batch_docs[gen_doc['_id']] = gen_doc last_gen, last_trans_id = self.batch_generation self.batch_generation = (last_gen + 1, transaction_id) - self._allocate_new_generation(doc.doc_id, transaction_id) - def _new_resource(self, *path): """ Return a new resource for accessing a couch database. diff --git a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py index d73c0d16..27db65af 100644 --- a/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py +++ b/common/src/leap/soledad/common/l2db/backends/sqlite_backend.py @@ -505,12 +505,11 @@ class SQLiteDatabase(CommonBackend): def _put_doc_if_newer(self, doc, save_conflict, replica_uid=None, replica_gen=None, replica_trans_id=None): - with self._db_handle: - return super(SQLiteDatabase, self)._put_doc_if_newer( - doc, - save_conflict=save_conflict, - replica_uid=replica_uid, replica_gen=replica_gen, - replica_trans_id=replica_trans_id) + return super(SQLiteDatabase, self)._put_doc_if_newer( + doc, + save_conflict=save_conflict, + replica_uid=replica_uid, replica_gen=replica_gen, + replica_trans_id=replica_trans_id) def _add_conflict(self, c, doc_id, my_doc_rev, my_content): c.execute("INSERT INTO conflicts VALUES (?, ?, ?)", @@ -924,4 +923,5 @@ class SQLitePartialExpandDatabase(SQLiteDatabase): raw_doc = json.loads(doc) self._update_indexes(doc_id, raw_doc, getters, c) + SQLiteDatabase.register_implementation(SQLitePartialExpandDatabase) diff --git a/common/src/leap/soledad/common/l2db/remote/http_app.py b/common/src/leap/soledad/common/l2db/remote/http_app.py index 5cf6645e..496274b2 100644 --- a/common/src/leap/soledad/common/l2db/remote/http_app.py +++ b/common/src/leap/soledad/common/l2db/remote/http_app.py @@ -194,6 +194,7 @@ class URLToResource(object): resource_cls = params.pop('resource_cls') return resource_cls, params + url_to_resource = URLToResource() @@ -501,7 +502,9 @@ class HTTPResponder(object): self._write('\r\n') else: self._write(',\r\n') - self._write(json.dumps(entry)) + if type(entry) == dict: + entry = json.dumps(entry) + self._write(entry) def end_stream(self): "end stream (array)." diff --git a/scripts/db_access/client_side_db.py b/scripts/db_access/client_side_db.py index 11d72791..48eec0f7 100644 --- a/scripts/db_access/client_side_db.py +++ b/scripts/db_access/client_side_db.py @@ -1,13 +1,13 @@ #!/usr/bin/python import os +import sys import argparse import tempfile import getpass import requests import srp._pysrp as srp import binascii -import logging import json import time @@ -15,6 +15,7 @@ from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks from leap.soledad.client import Soledad +from leap.soledad.common.log import getLogger from leap.keymanager import KeyManager from leap.keymanager.openpgp import OpenPGPKey @@ -39,9 +40,9 @@ Use the --help option to see available options. # create a logger -logger = logging.getLogger(__name__) -LOG_FORMAT = '%(asctime)s %(message)s' -logging.basicConfig(format=LOG_FORMAT, level=logging.DEBUG) +logger = getLogger(__name__) +from twisted.python import log +log.startLogging(sys.stdout) safe_unhexlify = lambda x: binascii.unhexlify(x) if ( @@ -133,8 +134,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file, local_db_path=local_db_path, server_url=server_url, cert_file=cert_file, - auth_token=token, - defer_encryption=True) + auth_token=token) def _get_keymanager_instance(username, provider, soledad, token, diff --git a/scripts/docker/files/bin/client_side_db.py b/scripts/docker/files/bin/client_side_db.py index 4be33d13..80da7392 100644 --- a/scripts/docker/files/bin/client_side_db.py +++ b/scripts/docker/files/bin/client_side_db.py @@ -136,8 +136,7 @@ def _get_soledad_instance(uuid, passphrase, basedir, server_url, cert_file, local_db_path=local_db_path, server_url=server_url, cert_file=cert_file, - auth_token=token, - defer_encryption=True) + auth_token=token) def _get_keymanager_instance(username, provider, soledad, token, diff --git a/scripts/profiling/mail/soledad_client.py b/scripts/profiling/mail/soledad_client.py index 5ac8ce39..dcd605aa 100644 --- a/scripts/profiling/mail/soledad_client.py +++ b/scripts/profiling/mail/soledad_client.py @@ -30,8 +30,7 @@ class SoledadClient(object): server_url=self._server_url, cert_file=None, auth_token=self._auth_token, - secret_id=None, - defer_encryption=True) + secret_id=None) def close(self): if self._soledad is not None: diff --git a/scripts/profiling/sync/profile-sync.py b/scripts/profiling/sync/profile-sync.py index 34e66f03..1d59217a 100755 --- a/scripts/profiling/sync/profile-sync.py +++ b/scripts/profiling/sync/profile-sync.py @@ -91,7 +91,6 @@ def _get_soledad_instance_from_uuid(uuid, passphrase, basedir, server_url, server_url=server_url, cert_file=cert_file, auth_token=token, - defer_encryption=True, syncable=True) diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index e92dfde6..e4a87e74 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -3,3 +3,4 @@ PyOpenSSL twisted>=12.3.0 Beaker couchdb +python-cjson diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server index d9dab040..753a260b 100644 --- a/server/pkg/soledad-server +++ b/server/pkg/soledad-server @@ -11,7 +11,7 @@ PATH=/sbin:/bin:/usr/sbin:/usr/bin PIDFILE=/var/run/soledad.pid -OBJ=leap.soledad.server.application.wsgi_application +RESOURCE_CLASS=leap.soledad.server.resource.SoledadResource HTTPS_PORT=2424 CONFDIR=/etc/soledad CERT_PATH="${CONFDIR}/soledad-server.pem" @@ -39,7 +39,7 @@ case "${1}" in --syslog \ --prefix=soledad-server \ web \ - --wsgi=${OBJ} \ + --class=${RESOURCE_CLASS} \ --port=ssl:${HTTPS_PORT}:privateKey=${PRIVKEY_PATH}:certKey=${CERT_PATH}:sslmethod=${SSL_METHOD} echo "." ;; diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index d8243c19..039bef75 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody( try: content_length = int(self.environ['CONTENT_LENGTH']) except (ValueError, KeyError): - raise http_app.BadRequest + # raise http_app.BadRequest + content_length = self.max_request_size if content_length <= 0: raise http_app.BadRequest if content_length > self.max_request_size: @@ -219,27 +220,23 @@ class HTTPInvocationByMethodWithBody( if content_type == 'application/x-soledad-sync-put': meth_put = self._lookup('%s_put' % method) meth_end = self._lookup('%s_end' % method) - entries = [] while True: - line = body_getline() - entry = line.strip() + entry = body_getline().strip() if entry == ']': # end of incoming document stream break if not entry or not comma: # empty or no prec comma raise http_app.BadRequest entry, comma = utils.check_and_strip_comma(entry) - entries.append(entry) + content = body_getline().strip() + content, comma = utils.check_and_strip_comma(content) + meth_put({'content': content or None}, entry) if comma or body_getline(): # extra comma or data raise http_app.BadRequest - for entry in entries: - meth_put({}, entry) return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': - line = body_getline() - entry = line.strip() meth_get = self._lookup('%s_get' % method) - return meth_get({}, line) + return meth_get() else: raise http_app.BadRequest() else: diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py index 4a791cbe..3c17ec19 100644 --- a/server/src/leap/soledad/server/config.py +++ b/server/src/leap/soledad/server/config.py @@ -24,7 +24,7 @@ CONFIG_DEFAULTS = { 'couch_url': 'http://localhost:5984', 'create_cmd': None, 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', - 'batching': False + 'batching': True }, 'database-security': { 'members': ['soledad'], diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py new file mode 100644 index 00000000..dbb91b0a --- /dev/null +++ b/server/src/leap/soledad/server/resource.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# resource.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A twisted resource that serves the Soledad Server. +""" + +from twisted.web.resource import Resource +from twisted.web.wsgi import WSGIResource +from twisted.internet import reactor +from twisted.python import threadpool + +from leap.soledad.server.application import wsgi_application + + +__all__ = ['SoledadResource'] + + +# setup a wsgi resource with its own threadpool +pool = threadpool.ThreadPool() +reactor.callWhenRunning(pool.start) +reactor.addSystemEventTrigger('after', 'shutdown', pool.stop) +wsgi_resource = WSGIResource(reactor, pool, wsgi_application) + + +class SoledadResource(Resource): + """ + This is a dummy twisted resource, used only to allow different entry points + for the Soledad Server. + """ + + def __init__(self): + self.children = {'': wsgi_resource} + + def getChild(self, path, request): + # for now, just "rewind" the path and serve the wsgi resource for all + # requests. In the future, we might look into the request path to + # decide which child resources should serve each request. + request.postpath.insert(0, request.prepath.pop()) + return self.children[''] diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,14 +17,19 @@ """ Server side synchronization infrastructure. """ -from leap.soledad.common.l2db import sync, Document +import time +from itertools import izip + +from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from leap.soledad.common.document import ServerDocument -MAX_REQUEST_SIZE = 200 # in Mb +MAX_REQUEST_SIZE = float('inf') # It's a stream. MAX_ENTRY_SIZE = 200 # in Mb +ENTRY_CACHE_SIZE = 8192 * 1024 class SyncExchange(sync.SyncExchange): @@ -51,7 +56,7 @@ class SyncExchange(sync.SyncExchange): # recover sync state self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) - def find_changes_to_return(self, received): + def find_changes_to_return(self): """ Find changes to return. @@ -59,10 +64,6 @@ class SyncExchange(sync.SyncExchange): order using whats_changed. It excludes documents ids that have already been considered (superseded by the sender, etc). - :param received: How many documents the source replica has already - received during the current sync process. - :type received: int - :return: the generation of this database, which the caller can consider themselves to be synchronized after processing allreturned documents, and the amount of documents to be sent @@ -78,41 +79,45 @@ class SyncExchange(sync.SyncExchange): self._trace('after whats_changed') seen_ids = self._sync_state.seen_ids() # changed docs that weren't superseded by or converged with - changes_to_return = [ + self.changes_to_return = [ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes # there was a subsequent update if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] self._sync_state.put_changes_to_return( - new_gen, new_trans_id, changes_to_return) - number_of_changes = len(changes_to_return) - # query server for stored changes - _, _, next_change_to_return = \ - self._sync_state.next_change_to_return(received) + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) self.new_gen = new_gen self.new_trans_id = new_trans_id - # and append one change - self.change_to_return = next_change_to_return return self.new_gen, number_of_changes - def return_one_doc(self, return_doc_cb): - """ - Return one changed document and its last change generation to the - source syncing replica by invoking the callback return_doc_cb. + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. - This is called once for each document to be transferred from target to - source. + The final step of a sync exchange. - :param return_doc_cb: is a callback used to return the documents with - their last change generation to the target - replica. - :type return_doc_cb: callable(doc, gen, trans_id) + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None """ - if self.change_to_return is not None: - changed_doc_id, gen, trans_id = self.change_to_return - doc = self._db.get_doc(changed_doc_id, include_deleted=True) + changes_to_return = self.changes_to_return + # return docs, including conflicts. + # content as a file-object (will be read when writing) + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, + include_deleted=True, read_content=False) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): + if not entries: + return self._db.batch_start() for entry in entries: doc, gen, trans_id, number_of_docs, doc_idx = entry @@ -207,6 +212,7 @@ class SyncResource(http_app.SyncResource): db, self.source_replica_uid, last_known_generation, sync_id) self._sync_id = sync_id self._staging = [] + self._staging_size = 0 @http_app.http_method(content_as_args=True) def post_put( @@ -233,26 +239,37 @@ class SyncResource(http_app.SyncResource): :param doc_idx: The index of the current document. :type doc_idx: int """ - doc = Document(id, rev, content) + doc = ServerDocument(id, rev, json=content) + self._staging_size += len(content or '') self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs: + self.sync_exch.batched_insert_from_source(self._staging, + self._sync_id) + self._staging = [] + self._staging_size = 0 - @http_app.http_method(received=int, content_as_args=True) - def post_get(self, received): + def post_get(self): """ - Return one syncing document to the client. - - :param received: How many documents have already been received by the - client on the current sync session. - :type received: int + Return syncing documents to the client. """ - def send_doc(doc, gen, trans_id): - entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) + content_reader = doc.get_json() + if content_reader: + content = content_reader.read() + self.responder.stream_entry(content) + content_reader.close() + # throttle at 5mb/s + # FIXME: twistd cant control througput + # we need to either use gunicorn or go async + time.sleep(len(content) / (5.0 * 1024 * 1024)) + else: + self.responder.stream_entry('') new_gen, number_of_changes = \ - self.sync_exch.find_changes_to_return(received) + self.sync_exch.find_changes_to_return() self.responder.content_type = 'application/x-u1db-sync-response' self.responder.start_response(200) self.responder.start_stream(), @@ -264,7 +281,7 @@ class SyncResource(http_app.SyncResource): if self.replica_uid is not None: header['replica_uid'] = self.replica_uid self.responder.stream_entry(header) - self.sync_exch.return_one_doc(send_doc) + self.sync_exch.return_docs(send_doc) self.responder.end_stream() self.responder.finish_response() @@ -273,7 +290,6 @@ class SyncResource(http_app.SyncResource): Return the current generation and transaction_id after inserting one incoming document. """ - self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) self.responder.start_stream(), @@ -1,7 +1,7 @@ [pep8] -exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py -ignore = E731 +exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox +ignore = F812,E731 [flake8] -exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py -ignore = E731 +exclude = versioneer.py,_version.py,*.egg,build,docs,scripts,ddocs.py,.tox +ignore = F812,E731 diff --git a/testing/pytest.ini b/testing/pytest.ini index 2d34c607..eb70b67c 100644 --- a/testing/pytest.ini +++ b/testing/pytest.ini @@ -1,3 +1,3 @@ [pytest] testpaths = tests -norecursedirs = tests/perf +twisted = yes diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py index d53f6cda..57f8199b 100644 --- a/testing/test_soledad/util.py +++ b/testing/test_soledad/util.py @@ -15,12 +15,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - """ Utilities used by multiple test suites. """ - import os import random import string @@ -45,21 +43,20 @@ from leap.soledad.common.document import SoledadDocument from leap.soledad.common.couch import CouchDatabase from leap.soledad.common.couch.state import CouchServerState -from leap.soledad.common.crypto import ENC_SCHEME_KEY from leap.soledad.client import Soledad from leap.soledad.client import http_target from leap.soledad.client import auth -from leap.soledad.client.crypto import decrypt_doc_dict from leap.soledad.client.sqlcipher import SQLCipherDatabase from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.client._crypto import is_symmetrically_encrypted from leap.soledad.server import SoledadApp from leap.soledad.server.auth import SoledadTokenAuthMiddleware PASSWORD = '123456' -ADDRESS = 'leap@leap.se' +ADDRESS = 'user-1234' def make_local_db_and_target(test): @@ -193,8 +190,7 @@ class MockedSharedDBTest(object): def soledad_sync_target( - test, path, source_replica_uid=uuid4().hex, - sync_db=None, sync_enc_pool=None): + test, path, source_replica_uid=uuid4().hex): creds = {'token': { 'uuid': 'user-uuid', 'token': 'auth-token', @@ -204,14 +200,13 @@ def soledad_sync_target( source_replica_uid, creds, test._soledad._crypto, - None, # cert_file - sync_db=sync_db, - sync_enc_pool=sync_enc_pool) + None) # cert_file # redefine the base leap test class so it inherits from twisted trial's # TestCase. This is needed so trial knows that it has to manage a reactor and # wait for deferreds returned by tests to be fired. + BaseLeapTest = type( 'BaseLeapTest', (unittest.TestCase,), dict(BaseLeapTest.__dict__)) @@ -221,7 +216,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): """ Instantiates Soledad for usage in tests. """ - defer_sync_encryption = False @pytest.mark.usefixtures("method_tmpdir") def setUp(self): @@ -229,14 +223,7 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): # repeat it here because twisted.trial does not work with # setUpClass/tearDownClass. - self.old_path = os.environ['PATH'] - self.old_home = os.environ['HOME'] self.home = self.tempdir - bin_tdir = os.path.join( - self.tempdir, - 'bin') - os.environ["PATH"] = bin_tdir - os.environ["HOME"] = self.tempdir # config info self.db1_file = os.path.join(self.tempdir, "db1.u1db") @@ -263,10 +250,6 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): self._db2.close() self._soledad.close() - # restore paths - os.environ["PATH"] = self.old_path - os.environ["HOME"] = self.old_home - def _delete_temporary_dirs(): # XXX should not access "private" attrs for f in [self._soledad.local_db_path, @@ -305,12 +288,12 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): self.tempdir, prefix, local_db_path), server_url=server_url, # Soledad will fail if not given an url cert_file=cert_file, - defer_encryption=self.defer_sync_encryption, shared_db=MockSharedDB(), auth_token=auth_token) self.addCleanup(soledad.close) return soledad + @pytest.inlineCallbacks def assertGetEncryptedDoc( self, db, doc_id, doc_rev, content, has_conflicts): """ @@ -320,13 +303,9 @@ class BaseSoledadTest(BaseLeapTest, MockedSharedDBTest): has_conflicts=has_conflicts) doc = db.get_doc(doc_id) - if ENC_SCHEME_KEY in doc.content: - # XXX check for SYM_KEY too - key = self._soledad._crypto.doc_passphrase(doc.doc_id) - secret = self._soledad._crypto.secret - decrypted = decrypt_doc_dict( - doc.content, doc.doc_id, doc.rev, - key, secret) + if is_symmetrically_encrypted(doc.content['raw']): + crypt = self._soledad._crypto + decrypted = yield crypt.decrypt_doc(doc) doc.set_json(decrypted) self.assertEqual(exp_doc.doc_id, doc.doc_id) self.assertEqual(exp_doc.rev, doc.rev) diff --git a/testing/tests/perf/assets/cert_default.conf b/testing/tests/benchmarks/assets/cert_default.conf index 8043cea3..8043cea3 100644 --- a/testing/tests/perf/assets/cert_default.conf +++ b/testing/tests/benchmarks/assets/cert_default.conf diff --git a/testing/tests/benchmarks/conftest.py b/testing/tests/benchmarks/conftest.py new file mode 100644 index 00000000..a9cc3464 --- /dev/null +++ b/testing/tests/benchmarks/conftest.py @@ -0,0 +1,57 @@ +import pytest +import random +import base64 + +from twisted.internet import threads, reactor + + +# we have to manually setup the events server in order to be able to signal +# events. This is usually done by the enclosing application using soledad +# client (i.e. bitmask client). +from leap.common.events import server +server.ensure_server() + + +def pytest_addoption(parser): + parser.addoption( + "--num-docs", type="int", default=100, + help="the number of documents to use in performance tests") + + +@pytest.fixture() +def payload(): + def generate(size): + random.seed(1337) # same seed to avoid different bench results + payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) + # encode as base64 to avoid ascii encode/decode errors + return base64.b64encode(payload_bytes)[:size] # remove b64 overhead + return generate + + +@pytest.fixture() +def txbenchmark(benchmark): + def blockOnThread(*args, **kwargs): + return threads.deferToThread( + benchmark, threads.blockingCallFromThread, + reactor, *args, **kwargs) + return blockOnThread + + +@pytest.fixture() +def txbenchmark_with_setup(benchmark): + def blockOnThreadWithSetup(setup, f): + def blocking_runner(*args, **kwargs): + return threads.blockingCallFromThread(reactor, f, *args, **kwargs) + + def blocking_setup(): + args = threads.blockingCallFromThread(reactor, setup) + try: + return tuple(arg for arg in args), {} + except TypeError: + return ((args,), {}) if args else None + + def bench(): + return benchmark.pedantic(blocking_runner, setup=blocking_setup, + rounds=4, warmup_rounds=1) + return threads.deferToThread(bench) + return blockOnThreadWithSetup diff --git a/testing/tests/perf/pytest.ini b/testing/tests/benchmarks/pytest.ini index 7a0508ce..7a0508ce 100644 --- a/testing/tests/perf/pytest.ini +++ b/testing/tests/benchmarks/pytest.ini diff --git a/testing/tests/benchmarks/test_crypto.py b/testing/tests/benchmarks/test_crypto.py new file mode 100644 index 00000000..8ee9b899 --- /dev/null +++ b/testing/tests/benchmarks/test_crypto.py @@ -0,0 +1,97 @@ +""" +Benchmarks for crypto operations. +If you don't want to stress your local machine too much, you can pass the +SIZE_LIMT environment variable. + +For instance, to keep the maximum payload at 1MB: + +SIZE_LIMIT=1E6 py.test -s tests/perf/test_crypto.py +""" +import pytest +import os +import json +from uuid import uuid4 + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client import _crypto + +LIMIT = int(float(os.environ.get('SIZE_LIMIT', 50 * 1000 * 1000))) + + +def create_doc_encryption(size): + @pytest.mark.benchmark(group="test_crypto_encrypt_doc") + @pytest.inlineCallbacks + def test_doc_encryption(soledad_client, txbenchmark, payload): + crypto = soledad_client()._crypto + + DOC_CONTENT = {'payload': payload(size)} + doc = SoledadDocument( + doc_id=uuid4().hex, rev='rev', + json=json.dumps(DOC_CONTENT)) + + yield txbenchmark(crypto.encrypt_doc, doc) + return test_doc_encryption + + +# TODO this test is really bullshit, because it's still including +# the json serialization. + +def create_doc_decryption(size): + @pytest.inlineCallbacks + @pytest.mark.benchmark(group="test_crypto_decrypt_doc") + def test_doc_decryption(soledad_client, txbenchmark, payload): + crypto = soledad_client()._crypto + + DOC_CONTENT = {'payload': payload(size)} + doc = SoledadDocument( + doc_id=uuid4().hex, rev='rev', + json=json.dumps(DOC_CONTENT)) + + encrypted_doc = yield crypto.encrypt_doc(doc) + doc.set_json(encrypted_doc) + + yield txbenchmark(crypto.decrypt_doc, doc) + return test_doc_decryption + + +def create_raw_encryption(size): + @pytest.mark.benchmark(group="test_crypto_raw_encrypt") + def test_raw_encrypt(benchmark, payload): + key = payload(32) + benchmark(_crypto.encrypt_sym, payload(size), key) + return test_raw_encrypt + + +def create_raw_decryption(size): + @pytest.mark.benchmark(group="test_crypto_raw_decrypt") + def test_raw_decrypt(benchmark, payload): + key = payload(32) + iv, ciphertext = _crypto.encrypt_sym(payload(size), key) + benchmark(_crypto.decrypt_sym, ciphertext, key, iv) + return test_raw_decrypt + + +# Create the TESTS in the global namespace, they'll be picked by the benchmark +# plugin. + +encryption_tests = [ + ('10k', 1E4), + ('100k', 1E5), + ('500k', 5E5), + ('1M', 1E6), + ('10M', 1E7), + ('50M', 5E7), +] + +for name, size in encryption_tests: + if size < LIMIT: + sz = int(size) + globals()['test_encrypt_doc_' + name] = create_doc_encryption(sz) + globals()['test_decrypt_doc_' + name] = create_doc_decryption(sz) + + +for name, size in encryption_tests: + if size < LIMIT: + sz = int(size) + globals()['test_encrypt_raw_' + name] = create_raw_encryption(sz) + globals()['test_decrypt_raw_' + name] = create_raw_decryption(sz) diff --git a/testing/tests/perf/test_misc.py b/testing/tests/benchmarks/test_misc.py index ead48adf..ead48adf 100644 --- a/testing/tests/perf/test_misc.py +++ b/testing/tests/benchmarks/test_misc.py diff --git a/testing/tests/perf/test_sqlcipher.py b/testing/tests/benchmarks/test_sqlcipher.py index e7a54228..39c9e3ad 100644 --- a/testing/tests/perf/test_sqlcipher.py +++ b/testing/tests/benchmarks/test_sqlcipher.py @@ -29,10 +29,10 @@ def build_test_sqlcipher_create(amount, size): return test -test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500*1000) -test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100*1000) -test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10*1000) +test_async_create_20_500k = build_test_sqlcipher_async_create(20, 500 * 1000) +test_async_create_100_100k = build_test_sqlcipher_async_create(100, 100 * 1000) +test_async_create_1000_10k = build_test_sqlcipher_async_create(1000, 10 * 1000) # synchronous -test_create_20_500k = build_test_sqlcipher_create(20, 500*1000) -test_create_100_100k = build_test_sqlcipher_create(100, 100*1000) -test_create_1000_10k = build_test_sqlcipher_create(1000, 10*1000) +test_create_20_500k = build_test_sqlcipher_create(20, 500 * 1000) +test_create_100_100k = build_test_sqlcipher_create(100, 100 * 1000) +test_create_1000_10k = build_test_sqlcipher_create(1000, 10 * 1000) diff --git a/testing/tests/perf/test_sync.py b/testing/tests/benchmarks/test_sync.py index 0b48a0b9..1501d74b 100644 --- a/testing/tests/perf/test_sync.py +++ b/testing/tests/benchmarks/test_sync.py @@ -1,17 +1,14 @@ import pytest - from twisted.internet.defer import gatherResults +@pytest.inlineCallbacks def load_up(client, amount, payload): - deferreds = [] # create a bunch of local documents + deferreds = [] for i in xrange(amount): - d = client.create_doc({'content': payload}) - deferreds.append(d) - d = gatherResults(deferreds) - d.addCallback(lambda _: None) - return d + deferreds.append(client.create_doc({'content': payload})) + yield gatherResults(deferreds) def create_upload(uploads, size): @@ -27,9 +24,9 @@ def create_upload(uploads, size): return test -test_upload_20_500k = create_upload(20, 500*1000) -test_upload_100_100k = create_upload(100, 100*1000) -test_upload_1000_10k = create_upload(1000, 10*1000) +test_upload_20_500k = create_upload(20, 500 * 1000) +test_upload_100_100k = create_upload(100, 100 * 1000) +test_upload_1000_10k = create_upload(1000, 10 * 1000) def create_download(downloads, size): @@ -52,9 +49,9 @@ def create_download(downloads, size): return test -test_download_20_500k = create_download(20, 500*1000) -test_download_100_100k = create_download(100, 100*1000) -test_download_1000_10k = create_download(1000, 10*1000) +test_download_20_500k = create_download(20, 500 * 1000) +test_download_100_100k = create_download(100, 100 * 1000) +test_download_1000_10k = create_download(1000, 10 * 1000) @pytest.inlineCallbacks diff --git a/testing/tests/client/test_aux_methods.py b/testing/tests/client/test_aux_methods.py index c25ff8ca..9b4a175f 100644 --- a/testing/tests/client/test_aux_methods.py +++ b/testing/tests/client/test_aux_methods.py @@ -21,10 +21,10 @@ import os from twisted.internet import defer -from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import Soledad from leap.soledad.client.adbapi import U1DBConnectionPool from leap.soledad.client.secrets import PassphraseTooShort +from leap.soledad.client.secrets import SecretsException from test_soledad.util import BaseSoledadTest @@ -108,7 +108,7 @@ class AuxMethodsTestCase(BaseSoledadTest): sol.change_passphrase(u'654321') sol.close() - with self.assertRaises(DatabaseAccessError): + with self.assertRaises(SecretsException): self._soledad_instance( 'leap@leap.se', passphrase=u'123', diff --git a/testing/tests/client/test_crypto.py b/testing/tests/client/test_crypto.py index 77252b46..49a61438 100644 --- a/testing/tests/client/test_crypto.py +++ b/testing/tests/client/test_crypto.py @@ -17,47 +17,173 @@ """ Tests for cryptographic related stuff. """ -import os -import hashlib import binascii +import base64 +import hashlib +import json +import os + +from io import BytesIO + +import pytest + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.exceptions import InvalidTag -from leap.soledad.client import crypto from leap.soledad.common.document import SoledadDocument from test_soledad.util import BaseSoledadTest -from leap.soledad.common.crypto import WrongMacError -from leap.soledad.common.crypto import UnknownMacMethodError -from leap.soledad.common.crypto import ENC_JSON_KEY -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.common.crypto import MAC_KEY -from leap.soledad.common.crypto import MAC_METHOD_KEY +from leap.soledad.client import _crypto + +from twisted.trial import unittest +from twisted.internet import defer + + +snowden1 = ( + "You can't come up against " + "the world's most powerful intelligence " + "agencies and not accept the risk. " + "If they want to get you, over time " + "they will.") + + +class AESTest(unittest.TestCase): + + def test_chunked_encryption(self): + key = 'A' * 32 + + fd = BytesIO() + aes = _crypto.AESWriter(key, _buffer=fd) + iv = aes.iv + + data = snowden1 + block = 16 + + for i in range(len(data) / block): + chunk = data[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + ciphertext_chunked = fd.getvalue() + ciphertext, tag = _aes_encrypt(key, iv, data) + + assert ciphertext_chunked == ciphertext + + def test_decrypt(self): + key = 'A' * 32 + iv = 'A' * 16 + + data = snowden1 + block = 16 + + ciphertext, tag = _aes_encrypt(key, iv, data) + + fd = BytesIO() + aes = _crypto.AESWriter(key, iv, fd, tag=tag) + + for i in range(len(ciphertext) / block): + chunk = ciphertext[i * block:(i + 1) * block] + aes.write(chunk) + aes.end() + + cleartext_chunked = fd.getvalue() + assert cleartext_chunked == data + + +class BlobTestCase(unittest.TestCase): + + class doc_info: + doc_id = 'D-deadbeef' + rev = '397932e0c77f45fcb7c3732930e7e9b2:1' + + @defer.inlineCallbacks + def test_blob_encryptor(self): + + inf = BytesIO(snowden1) + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, + secret='A' * 96) + encrypted = yield blob.encrypt() + preamble, ciphertext = _crypto._split(encrypted.getvalue()) + ciphertext = ciphertext[:-16] -class EncryptedSyncTestCase(BaseSoledadTest): + assert len(preamble) == _crypto.PACMAN.size + unpacked_data = _crypto.PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + assert magic == _crypto.BLOB_SIGNATURE_MAGIC + assert sch == 1 + assert meth == _crypto.ENC_METHOD.aes_256_gcm + assert iv == blob.iv + assert doc_id == 'D-deadbeef' + assert rev == self.doc_info.rev - """ - Tests that guarantee that data will always be encrypted when syncing. - """ + aes_key = _crypto._get_sym_key_for_doc( + self.doc_info.doc_id, 'A' * 96) + assert ciphertext == _aes_encrypt(aes_key, blob.iv, snowden1)[0] - def test_encrypt_decrypt_json(self): + decrypted = _aes_decrypt(aes_key, blob.iv, blob.tag, ciphertext, + preamble) + assert str(decrypted) == snowden1 + + @defer.inlineCallbacks + def test_blob_decryptor(self): + + inf = BytesIO(snowden1) + + blob = _crypto.BlobEncryptor( + self.doc_info, inf, + secret='A' * 96) + ciphertext = yield blob.encrypt() + + decryptor = _crypto.BlobDecryptor( + self.doc_info, ciphertext, + secret='A' * 96) + decrypted = yield decryptor.decrypt() + assert decrypted == snowden1 + + @defer.inlineCallbacks + def test_encrypt_and_decrypt(self): """ - Test encrypting and decrypting documents. + Check that encrypting and decrypting gives same doc. """ - simpledoc = {'key': 'val'} - doc1 = SoledadDocument(doc_id='id') - doc1.content = simpledoc - - # encrypt doc - doc1.set_json(self._soledad._crypto.encrypt_doc(doc1)) - # assert content is different and includes keys - self.assertNotEqual( - simpledoc, doc1.content, - 'incorrect document encryption') - self.assertTrue(ENC_JSON_KEY in doc1.content) - self.assertTrue(ENC_SCHEME_KEY in doc1.content) - # decrypt doc - doc1.set_json(self._soledad._crypto.decrypt_doc(doc1)) - self.assertEqual( - simpledoc, doc1.content, 'incorrect document encryption') + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + assert encrypted != payload + assert 'raw' in encrypted + doc2 = SoledadDocument('id1', '1') + doc2.set_json(encrypted) + assert _crypto.is_symmetrically_encrypted(encrypted) + decrypted = yield crypto.decrypt_doc(doc2) + assert len(decrypted) != 0 + assert json.loads(decrypted) == payload + + @defer.inlineCallbacks + def test_decrypt_with_wrong_tag_raises(self): + """ + Trying to decrypt a document with wrong MAC should raise. + """ + crypto = _crypto.SoledadCrypto('A' * 96) + payload = {'key': 'someval'} + doc1 = SoledadDocument('id1', '1', json.dumps(payload)) + + encrypted = yield crypto.encrypt_doc(doc1) + encdict = json.loads(encrypted) + preamble, raw = _crypto._split(str(encdict['raw'])) + # mess with tag + messed = raw[:-16] + '0' * 16 + + preamble = base64.urlsafe_b64encode(preamble) + newraw = preamble + ' ' + base64.urlsafe_b64encode(str(messed)) + doc2 = SoledadDocument('id1', '1') + doc2.set_json(json.dumps({"raw": str(newraw)})) + + with pytest.raises(_crypto.InvalidBlob): + yield crypto.decrypt_doc(doc2) class RecoveryDocumentTestCase(BaseSoledadTest): @@ -74,13 +200,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): encrypted_secret = rd[ self._soledad.secrets.STORAGE_SECRETS_KEY][secret_id] self.assertTrue(self._soledad.secrets.CIPHER_KEY in encrypted_secret) - self.assertTrue( - encrypted_secret[self._soledad.secrets.CIPHER_KEY] == 'aes256') + self.assertEquals( + _crypto.ENC_METHOD.aes_256_gcm, + encrypted_secret[self._soledad.secrets.CIPHER_KEY]) self.assertTrue(self._soledad.secrets.LENGTH_KEY in encrypted_secret) self.assertTrue(self._soledad.secrets.SECRET_KEY in encrypted_secret) - def test_import_recovery_document(self): - rd = self._soledad.secrets._export_recovery_document() + def test_import_recovery_document(self, cipher='aes256'): + rd = self._soledad.secrets._export_recovery_document(cipher) s = self._soledad_instance() s.secrets._import_recovery_document(rd) s.secrets.set_secret_id(self._soledad.secrets._secret_id) @@ -89,6 +216,14 @@ class RecoveryDocumentTestCase(BaseSoledadTest): 'Failed settinng secret for symmetric encryption.') s.close() + def test_import_GCM_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256_GCM + self.test_import_recovery_document(cipher) + + def test_import_legacy_CTR_recovery_document(self): + cipher = self._soledad.secrets.CIPHER_AES256 + self.test_import_recovery_document(cipher) + class SoledadSecretsTestCase(BaseSoledadTest): @@ -146,60 +281,21 @@ class SoledadSecretsTestCase(BaseSoledadTest): "Should have a secret at this point") -class MacAuthTestCase(BaseSoledadTest): - - def test_decrypt_with_wrong_mac_raises(self): - """ - Trying to decrypt a document with wrong MAC should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC - doc.content[MAC_KEY] = '1234567890ABCDEF' - # try to decrypt doc - self.assertRaises( - WrongMacError, - self._soledad._crypto.decrypt_doc, doc) - - def test_decrypt_with_unknown_mac_method_raises(self): - """ - Trying to decrypt a document with unknown MAC method should raise. - """ - simpledoc = {'key': 'val'} - doc = SoledadDocument(doc_id='id') - doc.content = simpledoc - # encrypt doc - doc.set_json(self._soledad._crypto.encrypt_doc(doc)) - self.assertTrue(MAC_KEY in doc.content) - self.assertTrue(MAC_METHOD_KEY in doc.content) - # mess with MAC method - doc.content[MAC_METHOD_KEY] = 'mymac' - # try to decrypt doc - self.assertRaises( - UnknownMacMethodError, - self._soledad._crypto.decrypt_doc, doc) - - class SoledadCryptoAESTestCase(BaseSoledadTest): def test_encrypt_decrypt_sym(self): # generate 256-bit key key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') - plaintext = crypto.decrypt_sym(cyphertext, key, iv) + plaintext = _crypto.decrypt_sym(cyphertext, key, iv) self.assertEqual('data', plaintext) - def test_decrypt_with_wrong_iv_fails(self): + def test_decrypt_with_wrong_iv_raises(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -208,13 +304,13 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): wrongiv = rawiv while wrongiv == rawiv: wrongiv = os.urandom(1) + rawiv[1:] - plaintext = crypto.decrypt_sym( - cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym( + cyphertext, key, iv=binascii.b2a_base64(wrongiv)) - def test_decrypt_with_wrong_key_fails(self): + def test_decrypt_with_wrong_key_raises(self): key = os.urandom(32) - iv, cyphertext = crypto.encrypt_sym('data', key) + iv, cyphertext = _crypto.encrypt_sym('data', key) self.assertTrue(cyphertext is not None) self.assertTrue(cyphertext != '') self.assertTrue(cyphertext != 'data') @@ -222,5 +318,21 @@ class SoledadCryptoAESTestCase(BaseSoledadTest): # ensure keys are different in case we are extremely lucky while wrongkey == key: wrongkey = os.urandom(32) - plaintext = crypto.decrypt_sym(cyphertext, wrongkey, iv) - self.assertNotEqual('data', plaintext) + with pytest.raises(InvalidTag): + _crypto.decrypt_sym(cyphertext, wrongkey, iv) + + +def _aes_encrypt(key, iv, data): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv), backend=backend) + encryptor = cipher.encryptor() + return encryptor.update(data) + encryptor.finalize(), encryptor.tag + + +def _aes_decrypt(key, iv, tag, data, aead=''): + backend = default_backend() + cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend) + decryptor = cipher.decryptor() + if aead: + decryptor.authenticate_additional_data(aead) + return decryptor.update(data) + decryptor.finalize() diff --git a/testing/tests/client/test_deprecated_crypto.py b/testing/tests/client/test_deprecated_crypto.py new file mode 100644 index 00000000..8ee3735c --- /dev/null +++ b/testing/tests/client/test_deprecated_crypto.py @@ -0,0 +1,91 @@ +import json +from twisted.internet import defer +from uuid import uuid4 +from urlparse import urljoin + +from leap.soledad.client import crypto as old_crypto +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.common import crypto as common_crypto + +from test_soledad.u1db_tests import simple_doc +from test_soledad.util import SoledadWithCouchServerMixin +from test_soledad.util import make_token_soledad_app +from test_soledad.u1db_tests import TestCaseWithServer + + +def deprecate_client_crypto(client): + secret = client._crypto.secret + _crypto = old_crypto.SoledadCrypto(secret) + setattr(client._dbsyncer, '_crypto', _crypto) + return client + + +def couch_database(couch_url, uuid): + db = CouchDatabase(couch_url, "user-%s" % (uuid,)) + return db + + +class DeprecatedCryptoTest(SoledadWithCouchServerMixin, TestCaseWithServer): + + def setUp(self): + SoledadWithCouchServerMixin.setUp(self) + TestCaseWithServer.setUp(self) + + def tearDown(self): + SoledadWithCouchServerMixin.tearDown(self) + TestCaseWithServer.tearDown(self) + + @staticmethod + def make_app_with_state(state): + return make_token_soledad_app(state) + + @defer.inlineCallbacks + def test_touch_updates_remote_representation(self): + self.startTwistedServer() + user = 'user-' + uuid4().hex + server_url = 'http://%s:%d' % (self.server_address) + client = self._soledad_instance(user=user, server_url=server_url) + deprecated_client = deprecate_client_crypto( + self._soledad_instance(user=user, server_url=server_url)) + + self.make_app() + remote = self.request_state._create_database(replica_uid=client._uuid) + remote = CouchDatabase.open_database( + urljoin(self.couch_url, 'user-' + user), + create=True) + + # ensure remote db is empty + gen, docs = remote.get_all_docs() + assert gen == 0 + assert len(docs) == 0 + + # create a doc with deprecated client and sync + yield deprecated_client.create_doc(json.loads(simple_doc)) + yield deprecated_client.sync() + + # check for doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 1 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert common_crypto.ENC_JSON_KEY in content + assert common_crypto.ENC_SCHEME_KEY in content + assert common_crypto.ENC_METHOD_KEY in content + assert common_crypto.ENC_IV_KEY in content + assert common_crypto.MAC_KEY in content + assert common_crypto.MAC_METHOD_KEY in content + + # "touch" the document with a newer client and synx + _, docs = yield client.get_all_docs() + yield client.put_doc(doc) + yield client.sync() + + # check for newer representation of doc in remote db + gen, docs = remote.get_all_docs() + assert gen == 2 + assert len(docs) == 1 + doc = docs.pop() + content = doc.content + assert len(content) == 1 + assert 'raw' in content diff --git a/testing/tests/conftest.py b/testing/tests/conftest.py index 9e4319ac..1ff1cbb7 100644 --- a/testing/tests/conftest.py +++ b/testing/tests/conftest.py @@ -1,4 +1,29 @@ +import json +import os import pytest +import requests +import signal +import time + +from hashlib import sha512 +from subprocess import call +from urlparse import urljoin +from uuid import uuid4 + +from leap.soledad.common.couch import CouchDatabase +from leap.soledad.client import Soledad + + +# +# default options for all tests +# + +DEFAULT_PASSPHRASE = '123' + +DEFAULT_URL = 'http://127.0.0.1:2424' +DEFAULT_PRIVKEY = 'soledad_privkey.pem' +DEFAULT_CERTKEY = 'soledad_certkey.pem' +DEFAULT_TOKEN = 'an-auth-token' def pytest_addoption(parser): @@ -16,3 +41,167 @@ def couch_url(request): @pytest.fixture def method_tmpdir(request, tmpdir): request.instance.tempdir = tmpdir.strpath + + +# +# remote_db fixture: provides an empty database for a given user in a per +# function scope. +# + +class UserDatabase(object): + + def __init__(self, url, uuid): + self._remote_db_url = urljoin(url, 'user-%s' % uuid) + + def setup(self): + return CouchDatabase.open_database( + url=self._remote_db_url, create=True, replica_uid=None) + + def teardown(self): + requests.delete(self._remote_db_url) + + +@pytest.fixture() +def remote_db(request): + couch_url = request.config.option.couch_url + + def create(uuid): + db = UserDatabase(couch_url, uuid) + request.addfinalizer(db.teardown) + return db.setup() + return create + + +def get_pid(pidfile): + if not os.path.isfile(pidfile): + return 0 + try: + with open(pidfile) as f: + return int(f.read()) + except IOError: + return 0 + + +# +# soledad_server fixture: provides a running soledad server in a per module +# context (same soledad server for all tests in this module). +# + +class SoledadServer(object): + + def __init__(self, tmpdir_factory, couch_url): + tmpdir = tmpdir_factory.mktemp('soledad-server') + self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid') + self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log') + self._couch_url = couch_url + + def start(self): + self._create_conf_file() + # start the server + call([ + 'twistd', + '--logfile=%s' % self._logfile, + '--pidfile=%s' % self._pidfile, + 'web', + '--wsgi=leap.soledad.server.application.wsgi_application', + '--port=2424' + ]) + + def _create_conf_file(self): + if not os.access('/etc', os.W_OK): + return + if not os.path.isdir('/etc/soledad'): + os.mkdir('/etc/soledad') + with open('/etc/soledad/soledad-server.conf', 'w') as f: + content = '[soledad-server]\ncouch_url = %s' % self._couch_url + f.write(content) + + def stop(self): + pid = get_pid(self._pidfile) + os.kill(pid, signal.SIGKILL) + + +@pytest.fixture(scope='module') +def soledad_server(tmpdir_factory, request): + couch_url = request.config.option.couch_url + server = SoledadServer(tmpdir_factory, couch_url) + server.start() + request.addfinalizer(server.stop) + return server + + +# +# soledad_dbs fixture: provides all databases needed by soledad server in a per +# module scope (same databases for all tests in this module). +# + +def _token_dbname(): + dbname = 'tokens_' + \ + str(int(time.time() / (30 * 24 * 3600))) + return dbname + + +class SoledadDatabases(object): + + def __init__(self, url): + self._token_db_url = urljoin(url, _token_dbname()) + self._shared_db_url = urljoin(url, 'shared') + + def setup(self, uuid): + self._create_dbs() + self._add_token(uuid) + + def _create_dbs(self): + requests.put(self._token_db_url) + requests.put(self._shared_db_url) + + def _add_token(self, uuid): + token = sha512(DEFAULT_TOKEN).hexdigest() + content = {'type': 'Token', 'user_id': uuid} + requests.put( + self._token_db_url + '/' + token, data=json.dumps(content)) + + def teardown(self): + requests.delete(self._token_db_url) + requests.delete(self._shared_db_url) + + +@pytest.fixture() +def soledad_dbs(request): + couch_url = request.config.option.couch_url + + def create(uuid): + db = SoledadDatabases(couch_url) + request.addfinalizer(db.teardown) + return db.setup(uuid) + return create + + +# +# soledad_client fixture: provides a clean soledad client for a test function. +# + +@pytest.fixture() +def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request): + passphrase = DEFAULT_PASSPHRASE + server_url = DEFAULT_URL + token = DEFAULT_TOKEN + default_uuid = uuid4().hex + remote_db(default_uuid) + soledad_dbs(default_uuid) + + # get a soledad instance + def create(): + secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % default_uuid) + local_db_path = os.path.join(tmpdir.strpath, '%s.db' % default_uuid) + soledad_client = Soledad( + default_uuid, + unicode(passphrase), + secrets_path=secrets_path, + local_db_path=local_db_path, + server_url=server_url, + cert_file=None, + auth_token=token) + request.addfinalizer(soledad_client.close) + return soledad_client + return create diff --git a/testing/tests/couch/conftest.py b/testing/tests/couch/conftest.py deleted file mode 100644 index 1074f091..00000000 --- a/testing/tests/couch/conftest.py +++ /dev/null @@ -1,31 +0,0 @@ -import couchdb -import pytest -import random -import string - - -@pytest.fixture -def random_name(): - return 'user-' + ''.join( - random.choice( - string.ascii_lowercase) for _ in range(10)) - - -class RandomDatabase(object): - - def __init__(self, couch_url, name): - self.couch_url = couch_url - self.name = name - self.server = couchdb.client.Server(couch_url) - self.database = self.server.create(name) - - def teardown(self): - self.server.delete(self.name) - - -@pytest.fixture -def db(random_name, request): - couch_url = request.config.getoption('--couch-url') - db = RandomDatabase(couch_url, random_name) - request.addfinalizer(db.teardown) - return db diff --git a/testing/tests/couch/test_command.py b/testing/tests/couch/test_command.py index 68097fb1..9fb2c153 100644 --- a/testing/tests/couch/test_command.py +++ b/testing/tests/couch/test_command.py @@ -25,6 +25,7 @@ class CommandBasedDBCreationTest(unittest.TestCase): state.ensure_database, "user-1337") def test_raises_unauthorized_by_default(self): - state = couch_state.CouchServerState("url", check_schema_versions=False) + state = couch_state.CouchServerState("url", + check_schema_versions=False) self.assertRaises(u1db_errors.Unauthorized, state.ensure_database, "user-1337") diff --git a/testing/tests/couch/test_state.py b/testing/tests/couch/test_state.py index e293b5b8..e5ac3704 100644 --- a/testing/tests/couch/test_state.py +++ b/testing/tests/couch/test_state.py @@ -1,25 +1,32 @@ import pytest - from leap.soledad.common.couch import CONFIG_DOC_ID from leap.soledad.common.couch import SCHEMA_VERSION from leap.soledad.common.couch import SCHEMA_VERSION_KEY from leap.soledad.common.couch.state import CouchServerState +from uuid import uuid4 from leap.soledad.common.errors import WrongCouchSchemaVersionError from leap.soledad.common.errors import MissingCouchConfigDocumentError +from test_soledad.util import CouchDBTestCase + +class CouchDesignDocsTests(CouchDBTestCase): -def test_wrong_couch_version_raises(db): - wrong_schema_version = SCHEMA_VERSION + 1 - db.database.create( - {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version}) - with pytest.raises(WrongCouchSchemaVersionError): - CouchServerState(db.couch_url, create_cmd='/bin/echo', - check_schema_versions=True) + def setUp(self): + CouchDBTestCase.setUp(self) + self.db = self.couch_server.create('user-' + uuid4().hex) + self.addCleanup(self.delete_db, self.db.name) + def test_wrong_couch_version_raises(self): + wrong_schema_version = SCHEMA_VERSION + 1 + self.db.create( + {'_id': CONFIG_DOC_ID, SCHEMA_VERSION_KEY: wrong_schema_version}) + with pytest.raises(WrongCouchSchemaVersionError): + CouchServerState(self.couch_url, create_cmd='/bin/echo', + check_schema_versions=True) -def test_missing_config_doc_raises(db): - db.database.create({}) - with pytest.raises(MissingCouchConfigDocumentError): - CouchServerState(db.couch_url, create_cmd='/bin/echo', - check_schema_versions=True) + def test_missing_config_doc_raises(self): + self.db.create({}) + with pytest.raises(MissingCouchConfigDocumentError): + CouchServerState(self.couch_url, create_cmd='/bin/echo', + check_schema_versions=True) diff --git a/testing/tests/perf/conftest.py b/testing/tests/perf/conftest.py deleted file mode 100644 index 6fa6b2c0..00000000 --- a/testing/tests/perf/conftest.py +++ /dev/null @@ -1,249 +0,0 @@ -import json -import os -import pytest -import requests -import random -import base64 -import signal -import time - -from hashlib import sha512 -from uuid import uuid4 -from subprocess import call -from urlparse import urljoin -from twisted.internet import threads, reactor - -from leap.soledad.client import Soledad -from leap.soledad.common.couch import CouchDatabase - - -# we have to manually setup the events server in order to be able to signal -# events. This is usually done by the enclosing application using soledad -# client (i.e. bitmask client). -from leap.common.events import server -server.ensure_server() - - -def pytest_addoption(parser): - parser.addoption( - "--couch-url", type="string", default="http://127.0.0.1:5984", - help="the url for the couch server to be used during tests") - parser.addoption( - "--num-docs", type="int", default=100, - help="the number of documents to use in performance tests") - - -# -# default options for all tests -# - -DEFAULT_PASSPHRASE = '123' - -DEFAULT_URL = 'http://127.0.0.1:2424' -DEFAULT_PRIVKEY = 'soledad_privkey.pem' -DEFAULT_CERTKEY = 'soledad_certkey.pem' -DEFAULT_TOKEN = 'an-auth-token' - - -@pytest.fixture() -def payload(): - def generate(size): - random.seed(1337) # same seed to avoid different bench results - payload_bytes = bytearray(random.getrandbits(8) for _ in xrange(size)) - # encode as base64 to avoid ascii encode/decode errors - return base64.b64encode(payload_bytes)[:size] # remove b64 overhead - return generate - - -# -# soledad_dbs fixture: provides all databases needed by soledad server in a per -# module scope (same databases for all tests in this module). -# - -def _token_dbname(): - dbname = 'tokens_' + \ - str(int(time.time() / (30 * 24 * 3600))) - return dbname - - -class SoledadDatabases(object): - - def __init__(self, url): - self._token_db_url = urljoin(url, _token_dbname()) - self._shared_db_url = urljoin(url, 'shared') - - def setup(self, uuid): - self._create_dbs() - self._add_token(uuid) - - def _create_dbs(self): - requests.put(self._token_db_url) - requests.put(self._shared_db_url) - - def _add_token(self, uuid): - token = sha512(DEFAULT_TOKEN).hexdigest() - content = {'type': 'Token', 'user_id': uuid} - requests.put( - self._token_db_url + '/' + token, data=json.dumps(content)) - - def teardown(self): - requests.delete(self._token_db_url) - requests.delete(self._shared_db_url) - - -@pytest.fixture() -def soledad_dbs(request): - couch_url = request.config.option.couch_url - - def create(uuid): - db = SoledadDatabases(couch_url) - request.addfinalizer(db.teardown) - return db.setup(uuid) - return create - - -# -# remote_db fixture: provides an empty database for a given user in a per -# function scope. -# - -class UserDatabase(object): - - def __init__(self, url, uuid): - self._remote_db_url = urljoin(url, 'user-%s' % uuid) - - def setup(self): - return CouchDatabase.open_database( - url=self._remote_db_url, create=True, replica_uid=None) - - def teardown(self): - requests.delete(self._remote_db_url) - - -@pytest.fixture() -def remote_db(request): - couch_url = request.config.option.couch_url - - def create(uuid): - db = UserDatabase(couch_url, uuid) - request.addfinalizer(db.teardown) - return db.setup() - return create - - -def get_pid(pidfile): - if not os.path.isfile(pidfile): - return 0 - try: - with open(pidfile) as f: - return int(f.read()) - except IOError: - return 0 - - -# -# soledad_server fixture: provides a running soledad server in a per module -# context (same soledad server for all tests in this module). -# - -class SoledadServer(object): - - def __init__(self, tmpdir_factory, couch_url): - tmpdir = tmpdir_factory.mktemp('soledad-server') - self._pidfile = os.path.join(tmpdir.strpath, 'soledad-server.pid') - self._logfile = os.path.join(tmpdir.strpath, 'soledad-server.log') - self._couch_url = couch_url - - def start(self): - self._create_conf_file() - # start the server - call([ - 'twistd', - '--logfile=%s' % self._logfile, - '--pidfile=%s' % self._pidfile, - 'web', - '--wsgi=leap.soledad.server.application.wsgi_application', - '--port=2424' - ]) - - def _create_conf_file(self): - if not os.access('/etc', os.W_OK): - return - if not os.path.isdir('/etc/soledad'): - os.mkdir('/etc/soledad') - with open('/etc/soledad/soledad-server.conf', 'w') as f: - content = '[soledad-server]\ncouch_url = %s' % self._couch_url - f.write(content) - - def stop(self): - pid = get_pid(self._pidfile) - os.kill(pid, signal.SIGKILL) - - -@pytest.fixture(scope='module') -def soledad_server(tmpdir_factory, request): - couch_url = request.config.option.couch_url - server = SoledadServer(tmpdir_factory, couch_url) - server.start() - request.addfinalizer(server.stop) - return server - - -@pytest.fixture() -def txbenchmark(benchmark): - def blockOnThread(*args, **kwargs): - return threads.deferToThread( - benchmark, threads.blockingCallFromThread, - reactor, *args, **kwargs) - return blockOnThread - - -@pytest.fixture() -def txbenchmark_with_setup(benchmark): - def blockOnThreadWithSetup(setup, f): - def blocking_runner(*args, **kwargs): - return threads.blockingCallFromThread(reactor, f, *args, **kwargs) - - def blocking_setup(): - args = threads.blockingCallFromThread(reactor, setup) - try: - return tuple(arg for arg in args), {} - except TypeError: - return ((args,), {}) if args else None - - def bench(): - return benchmark.pedantic(blocking_runner, setup=blocking_setup, - rounds=4, warmup_rounds=1) - return threads.deferToThread(bench) - return blockOnThreadWithSetup - - -# -# soledad_client fixture: provides a clean soledad client for a test function. -# - -@pytest.fixture() -def soledad_client(tmpdir, soledad_server, remote_db, soledad_dbs, request): - passphrase = DEFAULT_PASSPHRASE - server_url = DEFAULT_URL - token = DEFAULT_TOKEN - default_uuid = uuid4().hex - remote_db(default_uuid) - soledad_dbs(default_uuid) - - # get a soledad instance - def create(): - secrets_path = os.path.join(tmpdir.strpath, '%s.secret' % uuid4().hex) - local_db_path = os.path.join(tmpdir.strpath, '%s.db' % uuid4().hex) - soledad_client = Soledad( - default_uuid, - unicode(passphrase), - secrets_path=secrets_path, - local_db_path=local_db_path, - server_url=server_url, - cert_file=None, - auth_token=token, - defer_encryption=True) - request.addfinalizer(soledad_client.close) - return soledad_client - return create diff --git a/testing/tests/perf/test_crypto.py b/testing/tests/perf/test_crypto.py deleted file mode 100644 index be00560b..00000000 --- a/testing/tests/perf/test_crypto.py +++ /dev/null @@ -1,81 +0,0 @@ -import pytest -import json -from uuid import uuid4 -from leap.soledad.common.document import SoledadDocument -from leap.soledad.client.crypto import encrypt_sym -from leap.soledad.client.crypto import decrypt_sym - - -def create_doc_encryption(size): - @pytest.mark.benchmark(group="test_crypto_encrypt_doc") - def test_doc_encryption(soledad_client, benchmark, payload): - crypto = soledad_client()._crypto - - DOC_CONTENT = {'payload': payload(size)} - doc = SoledadDocument( - doc_id=uuid4().hex, rev='rev', - json=json.dumps(DOC_CONTENT)) - - benchmark(crypto.encrypt_doc, doc) - return test_doc_encryption - - -def create_doc_decryption(size): - @pytest.mark.benchmark(group="test_crypto_decrypt_doc") - def test_doc_decryption(soledad_client, benchmark, payload): - crypto = soledad_client()._crypto - - DOC_CONTENT = {'payload': payload(size)} - doc = SoledadDocument( - doc_id=uuid4().hex, rev='rev', - json=json.dumps(DOC_CONTENT)) - encrypted_doc = crypto.encrypt_doc(doc) - doc.set_json(encrypted_doc) - - benchmark(crypto.decrypt_doc, doc) - return test_doc_decryption - - -test_encrypt_doc_10k = create_doc_encryption(10*1000) -test_encrypt_doc_100k = create_doc_encryption(100*1000) -test_encrypt_doc_500k = create_doc_encryption(500*1000) -test_encrypt_doc_1M = create_doc_encryption(1000*1000) -test_encrypt_doc_10M = create_doc_encryption(10*1000*1000) -test_encrypt_doc_50M = create_doc_encryption(50*1000*1000) -test_decrypt_doc_10k = create_doc_decryption(10*1000) -test_decrypt_doc_100k = create_doc_decryption(100*1000) -test_decrypt_doc_500k = create_doc_decryption(500*1000) -test_decrypt_doc_1M = create_doc_decryption(1000*1000) -test_decrypt_doc_10M = create_doc_decryption(10*1000*1000) -test_decrypt_doc_50M = create_doc_decryption(50*1000*1000) - - -def create_raw_encryption(size): - @pytest.mark.benchmark(group="test_crypto_raw_encrypt") - def test_raw_encrypt(benchmark, payload): - key = payload(32) - benchmark(encrypt_sym, payload(size), key) - return test_raw_encrypt - - -def create_raw_decryption(size): - @pytest.mark.benchmark(group="test_crypto_raw_decrypt") - def test_raw_decrypt(benchmark, payload): - key = payload(32) - iv, ciphertext = encrypt_sym(payload(size), key) - benchmark(decrypt_sym, ciphertext, key, iv) - return test_raw_decrypt - - -test_encrypt_raw_10k = create_raw_encryption(10*1000) -test_encrypt_raw_100k = create_raw_encryption(100*1000) -test_encrypt_raw_500k = create_raw_encryption(500*1000) -test_encrypt_raw_1M = create_raw_encryption(1000*1000) -test_encrypt_raw_10M = create_raw_encryption(10*1000*1000) -test_encrypt_raw_50M = create_raw_encryption(50*1000*1000) -test_decrypt_raw_10k = create_raw_decryption(10*1000) -test_decrypt_raw_100k = create_raw_decryption(100*1000) -test_decrypt_raw_500k = create_raw_decryption(500*1000) -test_decrypt_raw_1M = create_raw_decryption(1000*1000) -test_decrypt_raw_10M = create_raw_decryption(10*1000*1000) -test_decrypt_raw_50M = create_raw_decryption(50*1000*1000) diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py deleted file mode 100644 index 77091a41..00000000 --- a/testing/tests/perf/test_encdecpool.py +++ /dev/null @@ -1,78 +0,0 @@ -import pytest -import json -from uuid import uuid4 -from twisted.internet.defer import gatherResults -from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool -from leap.soledad.common.document import SoledadDocument -# FIXME: test load is low due issue #7370, higher values will get out of memory - - -def create_encrypt(amount, size): - @pytest.mark.benchmark(group="test_pool_encrypt") - @pytest.inlineCallbacks - def test(soledad_client, txbenchmark_with_setup, request, payload): - DOC_CONTENT = {'payload': payload(size)} - - def setup(): - client = soledad_client() - pool = SyncEncrypterPool(client._crypto, client._sync_db) - pool.start() - request.addfinalizer(pool.stop) - docs = [ - SoledadDocument(doc_id=uuid4().hex, rev='rev', - json=json.dumps(DOC_CONTENT)) - for _ in xrange(amount) - ] - return pool, docs - - @pytest.inlineCallbacks - def put_and_wait(pool, docs): - yield gatherResults([pool.encrypt_doc(doc) for doc in docs]) - - yield txbenchmark_with_setup(setup, put_and_wait) - return test - -test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000) -test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000) -test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000) - - -def create_decrypt(amount, size): - @pytest.mark.benchmark(group="test_pool_decrypt") - @pytest.inlineCallbacks - def test(soledad_client, txbenchmark_with_setup, request, payload): - DOC_CONTENT = {'payload': payload(size)} - client = soledad_client() - - def setup(): - pool = SyncDecrypterPool( - client._crypto, - client._sync_db, - source_replica_uid=client._dbpool.replica_uid, - insert_doc_cb=lambda x, y, z: False) # ignored - pool.start(amount) - request.addfinalizer(pool.stop) - crypto = client._crypto - docs = [] - for _ in xrange(amount): - doc = SoledadDocument( - doc_id=uuid4().hex, rev='rev', - json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc.doc_id, encrypted_content)) - return pool, docs - - def put_and_wait(pool, docs): - deferreds = [] # fires on completion - for idx, (doc_id, content) in enumerate(docs, 1): - deferreds.append(pool.insert_encrypted_received_doc( - doc_id, 'rev', content, idx, "trans_id", idx)) - return gatherResults(deferreds) - - yield txbenchmark_with_setup(setup, put_and_wait) - return test - -test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000) -test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000) -test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py index 6bbcf002..6710caaf 100644 --- a/testing/tests/server/test_server.py +++ b/testing/tests/server/test_server.py @@ -41,7 +41,7 @@ from test_soledad.util import ( BaseSoledadTest, ) -from leap.soledad.common import crypto +from leap.soledad.client import _crypto from leap.soledad.client import Soledad from leap.soledad.server.config import load_configuration from leap.soledad.server.config import CONFIG_DEFAULTS @@ -412,13 +412,9 @@ class EncryptedSyncTestCase( self.assertEqual(soldoc.doc_id, couchdoc.doc_id) self.assertEqual(soldoc.rev, couchdoc.rev) couch_content = couchdoc.content.keys() - self.assertEqual(6, len(couch_content)) - self.assertTrue(crypto.ENC_JSON_KEY in couch_content) - self.assertTrue(crypto.ENC_SCHEME_KEY in couch_content) - self.assertTrue(crypto.ENC_METHOD_KEY in couch_content) - self.assertTrue(crypto.ENC_IV_KEY in couch_content) - self.assertTrue(crypto.MAC_KEY in couch_content) - self.assertTrue(crypto.MAC_METHOD_KEY in couch_content) + self.assertEqual(['raw'], couch_content) + content = couchdoc.get_json() + self.assertTrue(_crypto.is_symmetrically_encrypted(content)) d = sol1.get_all_docs() d.addCallback(_db1AssertEmptyDocList) @@ -473,16 +469,6 @@ class EncryptedSyncTestCase( """ return self._test_encrypted_sym_sync(passphrase=u'ãáà äéà ëÃìïóòöõúùüñç') - def test_sync_very_large_files(self): - """ - Test if Soledad can sync very large files. - """ - self.skipTest( - "Work in progress. For reference, see: " - "https://leap.se/code/issues/7370") - length = 100 * (10 ** 6) # 100 MB - return self._test_encrypted_sym_sync(doc_size=length, number_of_docs=1) - def test_sync_many_small_files(self): """ Test if Soledad can sync many smallfiles. diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py deleted file mode 100644 index 4a32885e..00000000 --- a/testing/tests/sync/test_encdecpool.py +++ /dev/null @@ -1,306 +0,0 @@ -# -*- coding: utf-8 -*- -# test_encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for encryption and decryption pool. -""" -import json -from random import shuffle - -from mock import MagicMock -from twisted.internet.defer import inlineCallbacks - -from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool - -from leap.soledad.common.document import SoledadDocument -from test_soledad.util import BaseSoledadTest -from twisted.internet import defer - -DOC_ID = "mydoc" -DOC_REV = "rev" -DOC_CONTENT = {'simple': 'document'} - - -class TestSyncEncrypterPool(BaseSoledadTest): - - def setUp(self): - BaseSoledadTest.setUp(self) - crypto = self._soledad._crypto - sync_db = self._soledad._sync_db - self._pool = SyncEncrypterPool(crypto, sync_db) - self._pool.start() - - def tearDown(self): - self._pool.stop() - BaseSoledadTest.tearDown(self) - - @inlineCallbacks - def test_get_encrypted_doc_returns_none(self): - """ - Test that trying to get an encrypted doc from the pool returns None if - the document was never added for encryption. - """ - doc = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV) - self.assertIsNone(doc) - - @inlineCallbacks - def test_encrypt_doc_and_get_it_back(self): - """ - Test that the pool actually encrypts a document added to the queue. - """ - doc = SoledadDocument( - doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) - - yield self._pool.encrypt_doc(doc) - encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV) - - self.assertIsNotNone(encrypted) - - -class TestSyncDecrypterPool(BaseSoledadTest): - - def _insert_doc_cb(self, doc, gen, trans_id): - """ - Method used to mock the sync's return_doc_cb callback. - """ - self._inserted_docs.append((doc, gen, trans_id)) - - def _setup_pool(self, sync_db=None): - sync_db = sync_db or self._soledad._sync_db - return SyncDecrypterPool( - self._soledad._crypto, - sync_db, - source_replica_uid=self._soledad._dbpool.replica_uid, - insert_doc_cb=self._insert_doc_cb) - - def setUp(self): - BaseSoledadTest.setUp(self) - # setup the pool - self._pool = self._setup_pool() - # reset the inserted docs mock - self._inserted_docs = [] - - def tearDown(self): - if self._pool.running: - self._pool.stop() - BaseSoledadTest.tearDown(self) - - def test_insert_received_doc(self): - """ - Test that one document added to the pool is inserted using the - callback. - """ - self._pool.start(1) - self._pool.insert_received_doc( - DOC_ID, DOC_REV, "{}", 1, "trans_id", 1) - - def _assert_doc_was_inserted(_): - self.assertEqual( - self._inserted_docs, - [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")]) - - self._pool.deferred.addCallback(_assert_doc_was_inserted) - return self._pool.deferred - - def test_looping_control(self): - """ - Start and stop cleanly. - """ - self._pool.start(10) - self.assertTrue(self._pool.running) - self._pool.stop() - self.assertFalse(self._pool.running) - self.assertTrue(self._pool.deferred.called) - - def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self): - """ - Test that docs_received table is migrated, and has the sync_id column - """ - mock_run_query = MagicMock(return_value=defer.succeed(None)) - mock_sync_db = MagicMock() - mock_sync_db.runQuery = mock_run_query - pool = self._setup_pool(mock_sync_db) - d = pool.start(10) - pool.stop() - - def assert_trial_to_create_sync_id_column(_): - mock_run_query.assert_called_once_with( - "ALTER TABLE docs_received ADD COLUMN sync_id") - - d.addCallback(assert_trial_to_create_sync_id_column) - return d - - def test_insert_received_doc_many(self): - """ - Test that many documents added to the pool are inserted using the - callback. - """ - many = 100 - self._pool.start(many) - - # insert many docs in the pool - for i in xrange(many): - gen = idx = i + 1 - doc_id = "doc_id: %d" % idx - rev = "rev: %d" % idx - content = {'idx': idx} - trans_id = "trans_id: %d" % idx - self._pool.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _assert_doc_was_inserted(_): - self.assertEqual(many, len(self._inserted_docs)) - idx = 1 - for doc, gen, trans_id in self._inserted_docs: - expected_gen = idx - expected_doc_id = "doc_id: %d" % idx - expected_rev = "rev: %d" % idx - expected_content = json.dumps({'idx': idx}) - expected_trans_id = "trans_id: %d" % idx - - self.assertEqual(expected_doc_id, doc.doc_id) - self.assertEqual(expected_rev, doc.rev) - self.assertEqual(expected_content, json.dumps(doc.content)) - self.assertEqual(expected_gen, gen) - self.assertEqual(expected_trans_id, trans_id) - - idx += 1 - - self._pool.deferred.addCallback(_assert_doc_was_inserted) - return self._pool.deferred - - def test_insert_encrypted_received_doc(self): - """ - Test that one encrypted document added to the pool is decrypted and - inserted using the callback. - """ - crypto = self._soledad._crypto - doc = SoledadDocument( - doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - - # insert the encrypted document in the pool - self._pool.start(1) - self._pool.insert_encrypted_received_doc( - DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) - - def _assert_doc_was_decrypted_and_inserted(_): - self.assertEqual(1, len(self._inserted_docs)) - self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) - - self._pool.deferred.addCallback( - _assert_doc_was_decrypted_and_inserted) - return self._pool.deferred - - @inlineCallbacks - def test_processing_order(self): - """ - This test ensures that processing of documents only occur if there is - a sequence in place. - """ - crypto = self._soledad._crypto - - docs = [] - for i in xrange(1, 10): - i = str(i) - doc = SoledadDocument( - doc_id=DOC_ID + i, rev=DOC_REV + i, - json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc, encrypted_content)) - - # insert the encrypted document in the pool - yield self._pool.start(10) # pool is expecting to process 10 docs - self._pool._loop.stop() # we are processing manually - # first three arrives, forming a sequence - for i, (doc, encrypted_content) in enumerate(docs[:3]): - gen = idx = i + 1 - yield self._pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) - - # last one arrives alone, so it can't be processed - doc, encrypted_content = docs[-1] - yield self._pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) - - yield self._pool._decrypt_and_recurse() - - self.assertEqual(3, self._pool._processed_docs) - - def test_insert_encrypted_received_doc_many(self, many=100): - """ - Test that many encrypted documents added to the pool are decrypted and - inserted using the callback. - """ - crypto = self._soledad._crypto - self._pool.start(many) - docs = [] - - # insert many encrypted docs in the pool - for i in xrange(many): - gen = idx = i + 1 - doc_id = "doc_id: %d" % idx - rev = "rev: %d" % idx - content = {'idx': idx} - trans_id = "trans_id: %d" % idx - - doc = SoledadDocument( - doc_id=doc_id, rev=rev, json=json.dumps(content)) - - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc_id, rev, encrypted_content, gen, - trans_id, idx)) - shuffle(docs) - - for doc in docs: - self._pool.insert_encrypted_received_doc(*doc) - - def _assert_docs_were_decrypted_and_inserted(_): - self.assertEqual(many, len(self._inserted_docs)) - idx = 1 - for doc, gen, trans_id in self._inserted_docs: - expected_gen = idx - expected_doc_id = "doc_id: %d" % idx - expected_rev = "rev: %d" % idx - expected_content = json.dumps({'idx': idx}) - expected_trans_id = "trans_id: %d" % idx - - self.assertEqual(expected_doc_id, doc.doc_id) - self.assertEqual(expected_rev, doc.rev) - self.assertEqual(expected_content, json.dumps(doc.content)) - self.assertEqual(expected_gen, gen) - self.assertEqual(expected_trans_id, trans_id) - - idx += 1 - - self._pool.deferred.addCallback( - _assert_docs_were_decrypted_and_inserted) - return self._pool.deferred - - @inlineCallbacks - def test_pool_reuse(self): - """ - The pool is reused between syncs, this test verifies that - reusing is fine. - """ - for i in xrange(3): - yield self.test_insert_encrypted_received_doc_many(5) - self._inserted_docs = [] - decrypted_docs = yield self._pool._get_docs(encrypted=False) - # check that decrypted docs staging is clean - self.assertEquals([], decrypted_docs) - self._pool.stop() diff --git a/testing/tests/sync/test_sqlcipher_sync.py b/testing/tests/sync/test_sqlcipher_sync.py index 3cbefc8b..26f63a40 100644 --- a/testing/tests/sync/test_sqlcipher_sync.py +++ b/testing/tests/sync/test_sqlcipher_sync.py @@ -27,8 +27,6 @@ from leap.soledad.common.l2db import sync from leap.soledad.common.l2db import vectorclock from leap.soledad.common.l2db import errors -from leap.soledad.common.crypto import ENC_SCHEME_KEY -from leap.soledad.client.crypto import decrypt_doc_dict from leap.soledad.client.http_target import SoledadHTTPSyncTarget from test_soledad import u1db_tests as tests @@ -545,13 +543,7 @@ class SQLCipherDatabaseSyncTests( self.assertFalse(doc2.has_conflicts) self.sync(self.db2, db3) doc3 = db3.get_doc('the-doc') - if ENC_SCHEME_KEY in doc3.content: - _crypto = self._soledad._crypto - key = _crypto.doc_passphrase(doc3.doc_id) - secret = _crypto.secret - doc3.set_json(decrypt_doc_dict( - doc3.content, - doc3.doc_id, doc3.rev, key, secret)) + self.assertEqual(doc4.get_json(), doc3.get_json()) self.assertFalse(doc3.has_conflicts) self.db1.close() @@ -713,15 +705,12 @@ def make_local_db_and_soledad_target( test.startTwistedServer() replica_uid = os.path.basename(path) db = test.request_state._create_database(replica_uid) - sync_db = test._soledad._sync_db - sync_enc_pool = test._soledad._sync_enc_pool st = soledad_sync_target( test, db._dbname, - source_replica_uid=source_replica_uid, - sync_db=sync_db, - sync_enc_pool=sync_enc_pool) + source_replica_uid=source_replica_uid) return db, st + target_scenarios = [ ('leap', { 'create_db_and_target': make_local_db_and_soledad_target, diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py index 5290003e..76757c5b 100644 --- a/testing/tests/sync/test_sync.py +++ b/testing/tests/sync/test_sync.py @@ -19,6 +19,7 @@ import threading import time from urlparse import urljoin +from mock import Mock from twisted.internet import defer from testscenarios import TestWithScenarios @@ -184,10 +185,9 @@ class TestSoledadDbSync( target = soledad_sync_target( self, self.db2._dbname, source_replica_uid=self._soledad._dbpool.replica_uid) - self.addCleanup(target.close) return sync.SoledadSynchronizer( self.db, - target).sync(defer_decryption=False) + target).sync() @defer.inlineCallbacks def test_db_sync(self): @@ -211,3 +211,21 @@ class TestSoledadDbSync( self.db, doc2.doc_id, doc2.rev, tests.nested_doc, False) # TODO: add u1db.tests.test_sync.TestRemoteSyncIntegration + + +class TestSoledadSynchronizer(BaseSoledadTest): + + def setUp(self): + BaseSoledadTest.setUp(self) + self.db = Mock() + self.target = Mock() + self.synchronizer = sync.SoledadSynchronizer( + self.db, + self.target) + + def test_docs_by_gen_includes_deleted(self): + changes = [('id', 'gen', 'trans')] + docs_by_gen = self.synchronizer._docs_by_gen_from_changes(changes) + f, args, kwargs = docs_by_gen[0][0] + self.assertIn('include_deleted', kwargs) + self.assertTrue(kwargs['include_deleted']) diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py deleted file mode 100644 index 4948aaf8..00000000 --- a/testing/tests/sync/test_sync_deferred.py +++ /dev/null @@ -1,196 +0,0 @@ -# test_sync_deferred.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test Leap backend bits: sync with deferred encryption/decryption. -""" -import time -import os -import random -import string -import shutil - -from urlparse import urljoin - -from twisted.internet import defer - -from leap.soledad.common import couch - -from leap.soledad.client import sync -from leap.soledad.client.sqlcipher import SQLCipherOptions -from leap.soledad.client.sqlcipher import SQLCipherDatabase - -from testscenarios import TestWithScenarios - -from test_soledad import u1db_tests as tests -from test_soledad.util import ADDRESS -from test_soledad.util import SoledadWithCouchServerMixin -from test_soledad.util import make_soledad_app -from test_soledad.util import soledad_sync_target - - -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = True - -WAIT_STEP = 1 -MAX_WAIT = 10 -DBPASS = "pass" - - -class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): - - """ - Another base class for testing the deferred encryption/decryption during - the syncs, using the intermediate database. - """ - defer_sync_encryption = True - - def setUp(self): - SoledadWithCouchServerMixin.setUp(self) - self.startTwistedServer() - # config info - self.db1_file = os.path.join(self.tempdir, "db1.u1db") - os.unlink(self.db1_file) - self.db_pass = DBPASS - self.email = ADDRESS - - # get a random prefix for each test, so we do not mess with - # concurrency during initialization and shutting down of - # each local db. - self.rand_prefix = ''.join( - map(lambda x: random.choice(string.ascii_letters), range(6))) - - # open test dbs: db1 will be the local sqlcipher db (which - # instantiates a syncdb). We use the self._soledad instance that was - # already created on some setUp method. - import binascii - tohex = binascii.b2a_hex - key = tohex(self._soledad.secrets.get_local_storage_key()) - sync_db_key = tohex(self._soledad.secrets.get_sync_db_key()) - dbpath = self._soledad._local_db_path - - self.opts = SQLCipherOptions( - dbpath, key, is_raw_key=True, create=False, - defer_encryption=True, sync_db_key=sync_db_key) - self.db1 = SQLCipherDatabase(self.opts) - - self.db2 = self.request_state._create_database('test') - - def tearDown(self): - # XXX should not access "private" attrs - shutil.rmtree(os.path.dirname(self._soledad._local_db_path)) - SoledadWithCouchServerMixin.tearDown(self) - - -class SyncTimeoutError(Exception): - - """ - Dummy exception to notify timeout during sync. - """ - pass - - -class TestSoledadDbSyncDeferredEncDecr( - TestWithScenarios, - BaseSoledadDeferredEncTest, - tests.TestCaseWithServer): - - """ - Test db.sync remote sync shortcut. - Case with deferred encryption and decryption: using the intermediate - syncdb. - """ - - scenarios = [ - ('http', { - 'make_app_with_state': make_soledad_app, - 'make_database_for_test': tests.make_memory_database_for_test, - }), - ] - - oauth = False - token = True - - def setUp(self): - """ - Need to explicitely invoke inicialization on all bases. - """ - BaseSoledadDeferredEncTest.setUp(self) - self.server = self.server_thread = None - self.syncer = None - - def tearDown(self): - """ - Need to explicitely invoke destruction on all bases. - """ - dbsyncer = getattr(self, 'dbsyncer', None) - if dbsyncer: - dbsyncer.close() - BaseSoledadDeferredEncTest.tearDown(self) - - def do_sync(self): - """ - Perform sync using SoledadSynchronizer, SoledadSyncTarget - and Token auth. - """ - replica_uid = self._soledad._dbpool.replica_uid - sync_db = self._soledad._sync_db - sync_enc_pool = self._soledad._sync_enc_pool - dbsyncer = self._soledad._dbsyncer # Soledad.sync uses the dbsyncer - - target = soledad_sync_target( - self, self.db2._dbname, - source_replica_uid=replica_uid, - sync_db=sync_db, - sync_enc_pool=sync_enc_pool) - self.addCleanup(target.close) - return sync.SoledadSynchronizer( - dbsyncer, - target).sync(defer_decryption=True) - - def wait_for_sync(self): - """ - Wait for sync to finish. - """ - wait = 0 - syncer = self.syncer - if syncer is not None: - while syncer.syncing: - time.sleep(WAIT_STEP) - wait += WAIT_STEP - if wait >= MAX_WAIT: - raise SyncTimeoutError - - @defer.inlineCallbacks - def test_db_sync(self): - """ - Test sync. - - Adapted to check for encrypted content. - """ - doc1 = self.db1.create_doc_from_json(tests.simple_doc) - doc2 = self.db2.create_doc_from_json(tests.nested_doc) - local_gen_before_sync = yield self.do_sync() - - gen, _, changes = self.db1.whats_changed(local_gen_before_sync) - self.assertEqual(1, len(changes)) - - self.assertEqual(doc2.doc_id, changes[0][0]) - self.assertEqual(1, gen - local_gen_before_sync) - - self.assertGetEncryptedDoc( - self.db2, doc1.doc_id, doc1.rev, tests.simple_doc, False) - self.assertGetEncryptedDoc( - self.db1, doc2.doc_id, doc2.rev, tests.nested_doc, False) diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py index 2626ab2a..432a3cd2 100644 --- a/testing/tests/sync/test_sync_mutex.py +++ b/testing/tests/sync/test_sync_mutex.py @@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target _old_sync = SoledadSynchronizer.sync -def _timed_sync(self, defer_decryption=True): +def _timed_sync(self): t = time.time() sync_id = uuid.uuid4() @@ -62,10 +62,11 @@ def _timed_sync(self, defer_decryption=True): self.source.sync_times[sync_id]['end'] = t return passthrough - d = _old_sync(self, defer_decryption=defer_decryption) + d = _old_sync(self) d.addBoth(_store_finish_time) return d + SoledadSynchronizer.sync = _timed_sync # -- end of monkey-patching diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index 964468ce..6ce9a5c5 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -30,10 +30,11 @@ from testscenarios import TestWithScenarios from twisted.internet import defer from leap.soledad.client import http_target as target -from leap.soledad.client import crypto +from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver from leap.soledad.client.sqlcipher import SQLCipherU1DBSync from leap.soledad.client.sqlcipher import SQLCipherOptions from leap.soledad.client.sqlcipher import SQLCipherDatabase +from leap.soledad.client import _crypto from leap.soledad.common import l2db @@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app from test_soledad.util import make_token_soledad_app from test_soledad.util import make_soledad_document_for_test from test_soledad.util import soledad_sync_target +from twisted.trial import unittest from test_soledad.util import SoledadWithCouchServerMixin from test_soledad.util import ADDRESS from test_soledad.util import SQLCIPHER_SCENARIOS @@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS # The following tests come from `u1db.tests.test_remote_sync_target`. # ----------------------------------------------------------------------------- -class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin): +class TestSoledadParseReceivedDocResponse(unittest.TestCase): """ Some tests had to be copied to this class so we can instantiate our own target. """ - def setUp(self): - SoledadWithCouchServerMixin.setUp(self) - creds = {'token': { - 'uuid': 'user-uuid', - 'token': 'auth-token', - }} - self.target = target.SoledadHTTPSyncTarget( - self.couch_url, - uuid4().hex, - creds, - self._soledad._crypto, - None) - - def tearDown(self): - self.target.close() - SoledadWithCouchServerMixin.tearDown(self) + def parse(self, stream): + parser = DocStreamReceiver(None, None, lambda *_: defer.succeed(42)) + parser.dataReceived(stream) + parser.finish() def test_extra_comma(self): - """ - Test adapted to use encrypted content. - """ doc = SoledadDocument('i', rev='r') - doc.content = {} - _crypto = self._soledad._crypto - key = _crypto.doc_passphrase(doc.doc_id) - secret = _crypto.secret + doc.content = {'a': 'b'} - enc_json = crypto.encrypt_docstr( - doc.get_json(), doc.doc_id, doc.rev, - key, secret) + encrypted_docstr = _crypto.SoledadCrypto('safe').encrypt_doc(doc) with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n{},\r\n]") + self.parse("[\r\n{},\r\n]") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( ('[\r\n{},\r\n{"id": "i", "rev": "r", ' + - '"content": %s, "gen": 3, "trans_id": "T-sid"}' + - ',\r\n]') % json.dumps(enc_json)) + '"gen": 3, "trans_id": "T-sid"},\r\n' + + '%s,\r\n]') % encrypted_docstr) def test_wrong_start(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("{}\r\n]") - - with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("\r\n{}\r\n]") + self.parse("{}\r\n]") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("") + self.parse("\r\n{}\r\n]") def test_wrong_end(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n{}") + self.parse("[\r\n{}") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n") + self.parse("[\r\n") def test_missing_comma(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{}\r\n{"id": "i", "rev": "r", ' '"content": "c", "gen": 3}\r\n]') def test_no_entries(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n]") + self.parse("[\r\n]") def test_error_in_stream(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{"new_generation": 0},' '\r\n{"error": "unavailable"}\r\n') with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{"error": "unavailable"}\r\n') with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n') + self.parse('[\r\n{"error": "?"}\r\n') # # functions for TestRemoteSyncTargets @@ -151,13 +130,9 @@ def make_local_db_and_soledad_target( test.startTwistedServer() replica_uid = os.path.basename(path) db = test.request_state._create_database(replica_uid) - sync_db = test._soledad._sync_db - sync_enc_pool = test._soledad._sync_enc_pool st = soledad_sync_target( test, db._dbname, - source_replica_uid=source_replica_uid, - sync_db=sync_db, - sync_enc_pool=sync_enc_pool) + source_replica_uid=source_replica_uid) return db, st @@ -188,16 +163,11 @@ class TestSoledadSyncTarget( def getSyncTarget(self, path=None, source_replica_uid=uuid4().hex): if self.port is None: self.startTwistedServer() - sync_db = self._soledad._sync_db - sync_enc_pool = self._soledad._sync_enc_pool if path is None: path = self.db2._dbname target = self.sync_target( self, path, - source_replica_uid=source_replica_uid, - sync_db=sync_db, - sync_enc_pool=sync_enc_pool) - self.addCleanup(target.close) + source_replica_uid=source_replica_uid) return target def setUp(self): @@ -229,10 +199,10 @@ class TestSoledadSyncTarget( other_docs.append((doc.doc_id, doc.rev, doc.get_json())) doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + get_doc = (lambda _: doc, (1,), {}) new_gen, trans_id = yield remote_target.sync_exchange( - [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=receive_doc, - defer_decryption=False) + [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0, + last_known_trans_id=None, insert_doc_cb=receive_doc) self.assertEqual(1, new_gen) self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', False) @@ -278,15 +248,16 @@ class TestSoledadSyncTarget( doc1 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') doc2 = self.make_document('doc-here2', 'replica:1', '{"value": "here2"}') + get_doc1 = (lambda _: doc1, (1,), {}) + get_doc2 = (lambda _: doc2, (2,), {}) with self.assertRaises(l2db.errors.U1DBError): yield remote_target.sync_exchange( - [(doc1, 10, 'T-sid'), (doc2, 11, 'T-sud')], + [(get_doc1, 10, 'T-sid'), (get_doc2, 11, 'T-sud')], 'replica', last_known_generation=0, last_known_trans_id=None, - insert_doc_cb=receive_doc, - defer_decryption=False) + insert_doc_cb=receive_doc) self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', @@ -297,9 +268,8 @@ class TestSoledadSyncTarget( # retry trigger_ids = [] new_gen, trans_id = yield remote_target.sync_exchange( - [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=receive_doc, - defer_decryption=False) + [(get_doc2, 11, 'T-sud')], 'replica', last_known_generation=0, + last_known_trans_id=None, insert_doc_cb=receive_doc) self.assertGetEncryptedDoc( db, 'doc-here2', 'replica:1', '{"value": "here2"}', False) @@ -328,10 +298,11 @@ class TestSoledadSyncTarget( replica_uid_box.append(replica_uid) doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + get_doc = (lambda _: doc, (1,), {}) new_gen, trans_id = yield remote_target.sync_exchange( - [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, + [(get_doc, 10, 'T-sid')], 'replica', last_known_generation=0, last_known_trans_id=None, insert_doc_cb=receive_doc, - ensure_callback=ensure_cb, defer_decryption=False) + ensure_callback=ensure_cb) self.assertEqual(1, new_gen) db = self.db2 self.assertEqual(1, len(replica_uid_box)) @@ -339,6 +310,37 @@ class TestSoledadSyncTarget( self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', False) + @defer.inlineCallbacks + def test_sync_exchange_send_events(self): + """ + Test for sync exchange's SOLEDAD_SYNC_SEND_STATUS event. + """ + remote_target = self.getSyncTarget() + uuid = remote_target.uuid + events = [] + + def mocked_events(*args): + events.append((args)) + self.patch( + target.send, '_emit_send_status', mocked_events) + + doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + doc2 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + doc3 = self.make_document('doc-here', 'replica:1', '{"value": "here"}') + get_doc = (lambda _: doc, (1,), {}) + get_doc2 = (lambda _: doc2, (1,), {}) + get_doc3 = (lambda _: doc3, (1,), {}) + docs = [(get_doc, 10, 'T-sid'), + (get_doc2, 11, 'T-sid2'), (get_doc3, 12, 'T-sid3')] + new_gen, trans_id = yield remote_target.sync_exchange( + docs, 'replica', last_known_generation=0, + last_known_trans_id=None, insert_doc_cb=lambda _: 1, + ensure_callback=lambda _: 1) + self.assertEqual(1, new_gen) + self.assertEqual(4, len(events)) + self.assertEquals([(uuid, 0, 3), (uuid, 1, 3), (uuid, 2, 3), + (uuid, 3, 3)], events) + def test_sync_exchange_in_stream_error(self): self.skipTest("bypass this test because our sync_exchange process " "does not return u1db error 503 \"unavailable\" for " @@ -421,7 +423,6 @@ class SoledadDatabaseSyncTargetTests( def tearDown(self): self.db.close() - self.st.close() tests.TestCaseWithServer.tearDown(self) SoledadWithCouchServerMixin.tearDown(self) @@ -442,12 +443,12 @@ class SoledadDatabaseSyncTargetTests( This test was adapted to decrypt remote content before assert. """ docs_by_gen = [ - (self.make_document('doc-id', 'replica:1', tests.simple_doc), 10, - 'T-sid')] + ((self.make_document, + ('doc-id', 'replica:1', tests.simple_doc,), {}), + 10, 'T-sid')] new_gen, trans_id = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertGetEncryptedDoc( self.db, 'doc-id', 'replica:1', tests.simple_doc, False) self.assertTransactionLog(['doc-id'], self.db) @@ -465,14 +466,13 @@ class SoledadDatabaseSyncTargetTests( This test was adapted to decrypt remote content before assert. """ docs_by_gen = [ - (self.make_document( - 'doc-id', 'replica:1', tests.simple_doc), 10, 'T-1'), - (self.make_document( - 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] + ((self.make_document, + ('doc-id', 'replica:1', tests.simple_doc), {}), 10, 'T-1'), + ((self.make_document, + ('doc-id2', 'replica:1', tests.nested_doc), {}), 11, 'T-2')] new_gen, trans_id = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertGetEncryptedDoc( self.db, 'doc-id', 'replica:1', tests.simple_doc, False) self.assertGetEncryptedDoc( @@ -498,8 +498,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) new_gen, _ = yield self.st.sync_exchange( [], 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) self.assertEqual(2, new_gen) self.assertEqual( @@ -552,7 +551,8 @@ class SoledadDatabaseSyncTargetTests( doc = self.db.create_doc_from_json('{}') edit_rev = 'replica:1|' + doc.rev docs_by_gen = [ - (self.make_document(doc.doc_id, edit_rev, None), 10, 'T-sid')] + ((self.make_document, (doc.doc_id, edit_rev, None), {}), + 10, 'T-sid')] new_gen, trans_id = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, last_known_trans_id=None, insert_doc_cb=self.receive_doc) @@ -571,7 +571,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id], self.db) new_doc = '{"key": "altval"}' docs_by_gen = [ - (self.make_document(doc.doc_id, 'replica:1', new_doc), 10, + ((self.make_document, (doc.doc_id, 'replica:1', new_doc), {}), 10, 'T-sid')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, @@ -591,7 +591,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id], self.db) gen, txid = self.db._get_generation_info() docs_by_gen = [ - (self.make_document(doc.doc_id, doc.rev, tests.simple_doc), + ((self.make_document, (doc.doc_id, doc.rev, tests.simple_doc), {}), 10, 'T-sid')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=gen, @@ -624,9 +624,9 @@ class SoledadDatabaseSyncTargetTests( [], 'other-replica', last_known_generation=0, last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertTransactionLog([doc.doc_id, doc.doc_id], self.db) + self.assertEqual(2, new_gen) self.assertEqual( (doc.doc_id, doc.rev, None, 2), self.other_changes[0][:-1]) - self.assertEqual(2, new_gen) if self.whitebox: self.assertEqual(self.db._last_exchange_log['return'], {'last_gen': 2, 'docs': [(doc.doc_id, doc.rev)]}) @@ -637,7 +637,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id], self.db) new_doc = '{"key": "altval"}' docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10, 'T-sid')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'other-replica', last_known_generation=0, @@ -662,7 +662,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id], self.db) new_doc = '{"key": "altval"}' docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10, 'T-sid')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'other-replica', last_known_generation=0, @@ -683,7 +683,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id], self.db) new_doc = '{"key": "altval"}' docs_by_gen = [ - (self.make_document(doc.doc_id, 'test:1|z:2', new_doc), 10, + ((self.make_document, (doc.doc_id, 'test:1|z:2', new_doc), {}), 10, 'T-sid')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'other-replica', last_known_generation=0, @@ -694,9 +694,9 @@ class SoledadDatabaseSyncTargetTests( def test_sync_exchange_converged_handling(self): doc = self.db.create_doc_from_json(tests.simple_doc) docs_by_gen = [ - (self.make_document('new', 'other:1', '{}'), 4, 'T-foo'), - (self.make_document(doc.doc_id, doc.rev, doc.get_json()), 5, - 'T-bar')] + ((self.make_document, ('new', 'other:1', '{}'), {}), 4, 'T-foo'), + ((self.make_document, (doc.doc_id, doc.rev, doc.get_json()), {}), + 5, 'T-bar')] new_gen, _ = yield self.st.sync_exchange( docs_by_gen, 'other-replica', last_known_generation=0, last_known_trans_id=None, insert_doc_cb=self.receive_doc) @@ -780,9 +780,6 @@ class SoledadDatabaseSyncTargetTests( self.assertEqual(expected, called) -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = False - WAIT_STEP = 1 MAX_WAIT = 10 DBPASS = "pass" @@ -842,12 +839,10 @@ class TestSoledadDbSync( import binascii tohex = binascii.b2a_hex key = tohex(self._soledad.secrets.get_local_storage_key()) - sync_db_key = tohex(self._soledad.secrets.get_sync_db_key()) dbpath = self._soledad._local_db_path self.opts = SQLCipherOptions( - dbpath, key, is_raw_key=True, create=False, - defer_encryption=True, sync_db_key=sync_db_key) + dbpath, key, is_raw_key=True, create=False) self.db1 = SQLCipherDatabase(self.opts) self.db2 = self.request_state._create_database(replica_uid='test') @@ -886,12 +881,10 @@ class TestSoledadDbSync( self.opts, crypto, replica_uid, - None, - defer_encryption=True) + None) self.dbsyncer = dbsyncer return dbsyncer.sync(target_url, - creds=creds, - defer_decryption=DEFER_DECRYPTION) + creds=creds) else: return self._do_sync(self, target_name) diff --git a/testing/tox.ini b/testing/tox.ini index 31cb8a4f..c46c6af1 100644 --- a/testing/tox.ini +++ b/testing/tox.ini @@ -1,12 +1,15 @@ [tox] envlist = py27 +skipsdist=True [testenv] basepython = python2.7 -commands = py.test --cov-report=html \ +commands = py.test --ignore=tests/benchmarks \ + --cov-report=html \ --cov-report=term \ - --cov=leap.soledad \ - {posargs} + --cov=leap.soledad \ + {posargs} +usedevelop = True deps = coverage pytest @@ -18,6 +21,7 @@ deps = pdbpp couchdb requests + service_identity # install soledad local packages -e../common -e../client @@ -27,11 +31,11 @@ setenv = TERM=xterm install_command = pip install {opts} {packages} -[testenv:perf] +[testenv:benchmark] deps = {[testenv]deps} pytest-benchmark -commands = py.test tests/perf {posargs} +commands = py.test --benchmark-only {posargs} [testenv:code-check] changedir = .. @@ -39,12 +43,12 @@ deps = pep8 flake8 commands = - pep8 client server common - flake8 --ignore=F812,E731 client server common + pep8 + flake8 [testenv:parallel] deps = {[testenv]deps} pytest-xdist install_command = pip install {opts} {packages} -commands = py.test {posargs} -n 4 +commands = py.test --ignore=tests/benchmarks {posargs} -n 4 |