diff options
Diffstat (limited to 'client/src/leap/soledad')
23 files changed, 876 insertions, 1196 deletions
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 245a8971..3a114021 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -21,6 +21,7 @@ from leap.soledad.client.api import Soledad from leap.soledad.common import soledad_assert from ._version import get_versions + __version__ = get_versions()['version'] del get_versions diff --git a/client/src/leap/soledad/client/_crypto.py b/client/src/leap/soledad/client/_crypto.py new file mode 100644 index 00000000..4bbdd044 --- /dev/null +++ b/client/src/leap/soledad/client/_crypto.py @@ -0,0 +1,386 @@ +# -*- coding: utf-8 -*- +# _crypto.py +# Copyright (C) 2016 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Cryptographic operations for the soledad client +""" + +import binascii +import base64 +import hashlib +import hmac +import os +import re +import struct +import time + +from io import BytesIO +from itertools import imap +from collections import namedtuple + +from twisted.internet import defer +from twisted.internet import interfaces +from twisted.web.client import FileBodyProducer + +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends.multibackend import MultiBackend +from cryptography.hazmat.backends.openssl.backend \ + import Backend as OpenSSLBackend + +from zope.interface import implements + + +SECRET_LENGTH = 64 + +CRYPTO_BACKEND = MultiBackend([OpenSSLBackend()]) + +PACMAN = struct.Struct('2sbbQ16s255p255p') +BLOB_SIGNATURE_MAGIC = '\x13\x37' + + +ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1) +ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2) +DocInfo = namedtuple('DocInfo', 'doc_id rev') + + +class EncryptionDecryptionError(Exception): + pass + + +class InvalidBlob(Exception): + pass + + +class SoledadCrypto(object): + """ + This class provides convenient methods for document encryption and + decryption using BlobEncryptor and BlobDecryptor classes. + """ + def __init__(self, secret): + """ + Initialize the crypto object. + + :param secret: The Soledad remote storage secret. + :type secret: str + """ + self.secret = secret + + def encrypt_doc(self, doc): + """ + Creates and configures a BlobEncryptor, asking it to start encryption + and wrapping the result as a simple JSON string with a "raw" key. + + :param doc: the document to be encrypted. + :type doc: SoledadDocument + :return: A deferred whose callback will be invoked with a JSON string + containing the ciphertext as the value of "raw" key. + :rtype: twisted.internet.defer.Deferred + """ + + def put_raw(blob): + raw = blob.getvalue() + return '{"raw": "' + raw + '"}' + + content = BytesIO(str(doc.get_json())) + info = DocInfo(doc.doc_id, doc.rev) + del doc + encryptor = BlobEncryptor(info, content, secret=self.secret) + d = encryptor.encrypt() + d.addCallback(put_raw) + return d + + def decrypt_doc(self, doc): + """ + Creates and configures a BlobDecryptor, asking it decrypt and returning + the decrypted cleartext content from the encrypted document. + + :param doc: the document to be decrypted. + :type doc: SoledadDocument + :return: The decrypted cleartext content of the document. + :rtype: str + """ + info = DocInfo(doc.doc_id, doc.rev) + ciphertext = BytesIO() + payload = doc.content['raw'] + del doc + ciphertext.write(str(payload)) + decryptor = BlobDecryptor(info, ciphertext, secret=self.secret) + return decryptor.decrypt() + + +def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm): + """ + Encrypt data using AES-256 cipher in selected mode. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt data (must be 256 bits long). + :type key: str + + :return: A tuple with the initialization vector and the ciphertext, both + encoded as base64. + :rtype: (str, str) + """ + mode = _mode_by_method(method) + encryptor = AESWriter(key, mode=mode) + encryptor.write(data) + _, ciphertext = encryptor.end() + iv = base64.b64encode(encryptor.iv) + tag = encryptor.tag or '' + return iv, ciphertext + tag + + +def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): + """ + Decrypt data using AES-256 cipher in selected mode. + + :param data: The data to be decrypted. + :type data: str + :param key: The symmetric key used to decrypt data (must be 256 bits + long). + :type key: str + :param iv: The base64 encoded initialization vector. + :type iv: str + + :return: The decrypted data. + :rtype: str + """ + _iv = base64.b64decode(str(iv)) + mode = _mode_by_method(method) + tag = None + if mode == modes.GCM: + data, tag = data[:-16], data[-16:] + decryptor = AESWriter(key, _iv, tag=tag, mode=mode) + decryptor.write(data) + _, plaintext = decryptor.end() + return plaintext + + +class BlobEncryptor(object): + """ + Produces encrypted data from the cleartext data associated with a given + SoledadDocument using AES-256 cipher in GCM mode. + The production happens using a Twisted's FileBodyProducer, which uses a + Cooperator to schedule calls and can be paused/resumed. Each call takes at + most 65536 bytes from the input. + Both the production input and output are file descriptors, so they can be + applied to a stream of data. + """ + def __init__(self, doc_info, content_fd, secret=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + self._content_fd = content_fd + self._producer = FileBodyProducer(content_fd, readSize=2**16) + + sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self._aes = AESWriter(sym_key) + self._aes.authenticate(self._make_preamble()) + + @property + def iv(self): + return self._aes.iv + + @property + def tag(self): + return self._aes.tag + + def encrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + d = self._producer.startProducing(self._aes) + d.addCallback(lambda _: self._end_crypto_stream()) + return d + + def _make_preamble(self): + current_time = int(time.time()) + + return PACMAN.pack( + BLOB_SIGNATURE_MAGIC, + ENC_SCHEME.symkey, + ENC_METHOD.aes_256_gcm, + current_time, + self.iv, + str(self.doc_id), + str(self.rev)) + + def _end_crypto_stream(self): + preamble, encrypted = self._aes.end() + result = BytesIO() + result.write( + base64.urlsafe_b64encode(preamble)) + result.write(' ') + result.write( + base64.urlsafe_b64encode(encrypted + self.tag)) + return defer.succeed(result) + + +class BlobDecryptor(object): + """ + Decrypts an encrypted blob associated with a given Document. + + Will raise an exception if the blob doesn't have the expected structure, or + if the GCM tag doesn't verify. + """ + + def __init__(self, doc_info, ciphertext_fd, result=None, + secret=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + + ciphertext_fd, preamble, iv = self._consume_preamble(ciphertext_fd) + + self.result = result or BytesIO() + sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self._aes = AESWriter(sym_key, iv, self.result, tag=self.tag) + self._aes.authenticate(preamble) + + self._producer = FileBodyProducer(ciphertext_fd, readSize=2**16) + + def _consume_preamble(self, ciphertext_fd): + ciphertext_fd.seek(0) + try: + preamble, ciphertext = _split(ciphertext_fd.getvalue()) + self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] + except (TypeError, binascii.Error): + raise InvalidBlob + ciphertext_fd.close() + + if len(preamble) != PACMAN.size: + raise InvalidBlob + + try: + unpacked_data = PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + except struct.error: + raise InvalidBlob + + if magic != BLOB_SIGNATURE_MAGIC: + raise InvalidBlob + # TODO check timestamp + if sch != ENC_SCHEME.symkey: + raise InvalidBlob('invalid scheme') + if meth != ENC_METHOD.aes_256_gcm: + raise InvalidBlob('invalid encryption scheme') + if rev != self.rev: + raise InvalidBlob('invalid revision') + if doc_id != self.doc_id: + raise InvalidBlob('invalid revision') + return BytesIO(ciphertext), preamble, iv + + def _end_stream(self): + try: + return self._aes.end()[1] + except InvalidTag: + raise InvalidBlob('Invalid Tag. Blob authentication failed.') + + def decrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + d = self._producer.startProducing(self._aes) + d.addCallback(lambda _: self._end_stream()) + return d + + +class AESWriter(object): + """ + A Twisted's Consumer implementation that takes an input file descriptor and + applies AES-256 cipher in GCM mode. + """ + implements(interfaces.IConsumer) + + def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): + if len(key) != 32: + raise EncryptionDecryptionError('key is not 256 bits') + self.iv = iv or os.urandom(16) + self.buffer = _buffer or BytesIO() + cipher = _get_aes_cipher(key, self.iv, tag, mode) + cipher = cipher.decryptor() if tag else cipher.encryptor() + self.cipher, self.aead = cipher, '' + + def authenticate(self, data): + self.aead += data + self.cipher.authenticate_additional_data(data) + + @property + def tag(self): + return getattr(self.cipher, 'tag', None) + + def write(self, data): + self.buffer.write(self.cipher.update(data)) + + def end(self): + self.buffer.write(self.cipher.finalize()) + return self.aead, self.buffer.getvalue() + + +def is_symmetrically_encrypted(content): + """ + Returns True if the document was symmetrically encrypted. + 'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically + encrypted value for enc_scheme flag). + + :param doc: The document content as string + :type doc: str + + :rtype: bool + """ + return content and content[:13] == '{"raw": "EzcB' + + +# utils + + +def _hmac_sha256(key, data): + return hmac.new(key, data, hashlib.sha256).digest() + + +def _get_sym_key_for_doc(doc_id, secret): + key = secret[SECRET_LENGTH:] + return _hmac_sha256(key, doc_id) + + +def _get_aes_cipher(key, iv, tag, mode=modes.GCM): + mode = mode(iv, tag) if mode == modes.GCM else mode(iv) + return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) + + +def _split(base64_raw_payload): + return imap(base64.urlsafe_b64decode, re.split(' ', base64_raw_payload)) + + +def _mode_by_method(method): + if method == ENC_METHOD.aes_256_gcm: + return modes.GCM + else: + return modes.CTR diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index ce9bec05..a5328d2b 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -50,8 +50,7 @@ How many times a SQLCipher query should be retried in case of timeout. SQLCIPHER_MAX_RETRIES = 10 -def getConnectionPool(opts, openfun=None, driver="pysqlcipher", - sync_enc_pool=None): +def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): """ Return a connection pool. @@ -72,7 +71,7 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher", if openfun is None and driver == "pysqlcipher": openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( - opts, sync_enc_pool, + opts, # the following params are relayed "as is" to twisted's # ConnectionPool. "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, @@ -89,7 +88,7 @@ class U1DBConnection(adbapi.Connection): The U1DB wrapper to use. """ - def __init__(self, pool, sync_enc_pool, init_u1db=False): + def __init__(self, pool, init_u1db=False): """ :param pool: The pool of connections to that owns this connection. :type pool: adbapi.ConnectionPool @@ -97,7 +96,6 @@ class U1DBConnection(adbapi.Connection): :type init_u1db: bool """ self.init_u1db = init_u1db - self._sync_enc_pool = sync_enc_pool try: adbapi.Connection.__init__(self, pool) except dbapi2.DatabaseError as e: @@ -116,8 +114,7 @@ class U1DBConnection(adbapi.Connection): if self.init_u1db: self._u1db = self.u1db_wrapper( self._connection, - self._pool.opts, - self._sync_enc_pool) + self._pool.opts) def __getattr__(self, name): """ @@ -162,12 +159,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool): connectionFactory = U1DBConnection transactionFactory = U1DBTransaction - def __init__(self, opts, sync_enc_pool, *args, **kwargs): + def __init__(self, opts, *args, **kwargs): """ Initialize the connection pool. """ self.opts = opts - self._sync_enc_pool = sync_enc_pool try: adbapi.ConnectionPool.__init__(self, *args, **kwargs) except dbapi2.DatabaseError as e: @@ -182,7 +178,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): try: conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=True) + self, init_u1db=True) replica_uid = conn._u1db._real_replica_uid setProxiedObject(self.replica_uid, replica_uid) except DatabaseAccessError as e: @@ -257,7 +253,7 @@ class U1DBConnectionPool(adbapi.ConnectionPool): tid = self.threadID() u1db = self._u1dbconnections.get(tid) conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=not bool(u1db)) + self, init_u1db=not bool(u1db)) if self.replica_uid is None: replica_uid = conn._u1db._real_replica_uid diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..da6eec66 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -56,11 +56,10 @@ from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events from leap.soledad.client import interfaces as soledad_interfaces -from leap.soledad.client.crypto import SoledadCrypto +from leap.soledad.client import sqlcipher from leap.soledad.client.secrets import SoledadSecrets from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.client import sqlcipher -from leap.soledad.client import encdecpool +from leap.soledad.client._crypto import SoledadCrypto logger = getLogger(__name__) @@ -131,7 +130,7 @@ class Soledad(object): def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, shared_db=None, - auth_token=None, defer_encryption=False, syncable=True): + auth_token=None, syncable=True): """ Initialize configuration, cryptographic keys and dbs. @@ -168,11 +167,6 @@ class Soledad(object): Authorization token for accessing remote databases. :type auth_token: str - :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it - inline while syncing. - :type defer_encryption: bool - :param syncable: If set to ``False``, this database will not attempt to synchronize with remote replicas (default is ``True``) @@ -188,9 +182,7 @@ class Soledad(object): self._passphrase = passphrase self._local_db_path = local_db_path self._server_url = server_url - self._defer_encryption = defer_encryption self._secrets_path = None - self._sync_enc_pool = None self._dbsyncer = None self.shared_db = shared_db @@ -226,7 +218,6 @@ class Soledad(object): # have to close any thread-related stuff we have already opened # here, otherwise there might be zombie threads that may clog the # reactor. - self._sync_db.close() if hasattr(self, '_dbpool'): self._dbpool.close() raise @@ -289,22 +280,12 @@ class Soledad(object): tohex = binascii.b2a_hex # sqlcipher only accepts the hex version key = tohex(self._secrets.get_local_storage_key()) - sync_db_key = tohex(self._secrets.get_sync_db_key()) opts = sqlcipher.SQLCipherOptions( self._local_db_path, key, - is_raw_key=True, create=True, - defer_encryption=self._defer_encryption, - sync_db_key=sync_db_key, - ) + is_raw_key=True, create=True) self._sqlcipher_opts = opts - - # the sync_db is used both for deferred encryption and decryption, so - # we want to initialize it anyway to allow for all combinations of - # deferred encryption and decryption configurations. - self._initialize_sync_db(opts) - self._dbpool = adbapi.getConnectionPool( - opts, sync_enc_pool=self._sync_enc_pool) + self._dbpool = adbapi.getConnectionPool(opts) def _init_u1db_syncer(self): """ @@ -313,10 +294,7 @@ class Soledad(object): replica_uid = self._dbpool.replica_uid self._dbsyncer = sqlcipher.SQLCipherU1DBSync( self._sqlcipher_opts, self._crypto, replica_uid, - SOLEDAD_CERT, - defer_encryption=self._defer_encryption, - sync_db=self._sync_db, - sync_enc_pool=self._sync_enc_pool) + SOLEDAD_CERT) def sync_stats(self): sync_phase = 0 @@ -341,12 +319,6 @@ class Soledad(object): self._dbpool.close() if getattr(self, '_dbsyncer', None): self._dbsyncer.close() - # close the sync database - if self._sync_db: - self._sync_db.close() - self._sync_db = None - if self._defer_encryption: - self._sync_enc_pool.stop() # # ILocalStorage @@ -385,7 +357,8 @@ class Soledad(object): also be updated. :rtype: twisted.internet.defer.Deferred """ - return self._defer("put_doc", doc) + d = self._defer("put_doc", doc) + return d def delete_doc(self, doc): """ @@ -479,7 +452,8 @@ class Soledad(object): # create_doc (and probably to put_doc too). There are cases (mail # payloads for example) in which we already have the encoding in the # headers, so we don't need to guess it. - return self._defer("create_doc", content, doc_id=doc_id) + d = self._defer("create_doc", content, doc_id=doc_id) + return d def create_doc_from_json(self, json, doc_id=None): """ @@ -700,37 +674,26 @@ class Soledad(object): if syncable and not self._dbsyncer: self._init_u1db_syncer() - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents with the server replica. This method uses a lock to prevent multiple concurrent sync processes over the same local db file. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred lock that will run the actual sync process when the lock is acquired, and which will fire with with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ d = self.sync_lock.run( - self._sync, - defer_decryption) + self._sync) return d - def _sync(self, defer_decryption): + def _sync(self): """ Synchronize documents with the server replica. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -740,8 +703,7 @@ class Soledad(object): return d = self._dbsyncer.sync( sync_url, - creds=self._creds, - defer_decryption=defer_decryption) + creds=self._creds) def _sync_callback(local_gen): self._last_received_docs = docs = self._dbsyncer.received_docs @@ -837,50 +799,6 @@ class Soledad(object): token = property(_get_token, _set_token, doc='The authentication Token.') - def _initialize_sync_db(self, opts): - """ - Initialize the Symmetrically-Encrypted document to be synced database, - and the queue to communicate with subprocess workers. - - :param opts: - :type opts: SQLCipherOptions - """ - soledad_assert(opts.sync_db_key is not None) - sync_db_path = None - if opts.path != ":memory:": - sync_db_path = "%s-sync" % opts.path - else: - sync_db_path = ":memory:" - - # we copy incoming options because the opts object might be used - # somewhere else - sync_opts = sqlcipher.SQLCipherOptions.copy( - opts, path=sync_db_path, create=True) - self._sync_db = sqlcipher.getConnectionPool( - sync_opts, extra_queries=self._sync_db_extra_init) - if self._defer_encryption: - # initialize syncing queue encryption pool - self._sync_enc_pool = encdecpool.SyncEncrypterPool( - self._crypto, self._sync_db) - self._sync_enc_pool.start() - - @property - def _sync_db_extra_init(self): - """ - Queries for creating tables for the local sync documents db if needed. - They are passed as extra initialization to initialize_sqlciphjer_db - - :rtype: tuple of strings - """ - maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = encdecpool.SyncEncrypterPool - decr = encdecpool.SyncDecrypterPool - sql_encr_table_query = (maybe_create % ( - encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr_table_query = (maybe_create % ( - decr.TABLE_NAME, decr.FIELD_NAMES)) - return (sql_encr_table_query, sql_decr_table_query) - # # ISecretsStorage # @@ -1017,6 +935,7 @@ def create_path_if_not_exists(path): # Monkey patching u1db to be able to provide a custom SSL cert # ---------------------------------------------------------------------------- + # We need a more reasonable timeout (in seconds) SOLEDAD_TIMEOUT = 120 diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index d81c883b..09e90171 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -32,9 +32,12 @@ from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto from leap.soledad.common.log import getLogger +import warnings logger = getLogger(__name__) +warnings.warn("'soledad.client.crypto' MODULE DEPRECATED", + DeprecationWarning, stacklevel=2) MAC_KEY_LENGTH = 64 diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py deleted file mode 100644 index 056b012f..00000000 --- a/client/src/leap/soledad/client/encdecpool.py +++ /dev/null @@ -1,657 +0,0 @@ -# -*- coding: utf-8 -*- -# encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A pool of encryption/decryption concurrent and parallel workers for using -during synchronization. -""" - - -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall -from twisted.internet import threads -from twisted.internet import defer - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common import soledad_assert -from leap.soledad.common.log import getLogger - -from leap.soledad.client.crypto import encrypt_docstr -from leap.soledad.client.crypto import decrypt_doc_dict - - -logger = getLogger(__name__) - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - - def __init__(self, crypto, sync_db): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - """ - self._crypto = crypto - self._sync_db = sync_db - self._delayed_call = None - self._started = False - - def start(self): - self._started = True - - def stop(self): - self._started = False - # maybe cancel the next delayed call - if self._delayed_call \ - and not self._delayed_call.called: - self._delayed_call.cancel() - - @property - def running(self): - return self._started - - def _runOperation(self, query, *args): - """ - Run an operation on the sync db. - - :param query: The query to be executed. - :type query: str - :param args: A list of query arguments. - :type args: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - return self._sync_db.runOperation(query, *args) - - def _runQuery(self, query, *args): - """ - Run a query on the sync db. - - :param query: The query to be executed. - :type query: str - :param args: A list of query arguments. - :type args: list - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - return self._sync_db.runQuery(query, *args) - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - - ENCRYPT_LOOP_PERIOD = 2 - - def __init__(self, *args, **kwargs): - """ - Initialize the sync encrypter pool. - """ - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - # TODO delete already synced files from database - - def start(self): - """ - Start the encrypter pool. - """ - SyncEncryptDecryptPool.start(self) - logger.debug("starting the encryption loop...") - - def stop(self): - """ - Stop the encrypter pool. - """ - - SyncEncryptDecryptPool.stop(self) - - def encrypt_doc(self, doc): - """ - Encrypt document asynchronously then insert it on - local staging database. - - :param doc: The document to be encrypted. - :type doc: SoledadDocument - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - # encrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - encrypt_doc_task, *args) - d.addCallback(self._encrypt_doc_cb) - return d - - def _encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - - def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - """ - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ - % (self.TABLE_NAME,) - return self._runOperation(query, (doc_id, doc_rev, content)) - - @defer.inlineCallbacks - def get_encrypted_doc(self, doc_id, doc_rev): - """ - Get an encrypted document from the sync db. - - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - - :return: A deferred that will fire with the encrypted content of the - document or None if the document was not found in the sync - db. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ - % self.TABLE_NAME - result = yield self._runQuery(query, (doc_id, doc_rev)) - if result: - logger.debug("found doc on sync db: %s" % doc_id) - val = result.pop() - defer.returnValue(val[0]) - logger.debug("did not find doc on sync db: %s" % doc_id) - defer.returnValue(None) - - def delete_encrypted_doc(self, doc_id, doc_rev): - """ - Delete an encrypted document from the sync db. - - :param doc_id: The id of the document. - :type doc_id: str - :param doc_rev: The revision of the document. - :type doc_rev: str - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ - % self.TABLE_NAME - self._runOperation(query, (doc_id, doc_rev)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, - idx): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document as JSON dict. - :type content: dict - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. Encrypted documents are stored in the sync db by the actual soledad - sync loop. - 2. The soledad sync loop tells us how many documents we should expect - to process. - 3. We start a decrypt-and-process loop: - - a. Encrypted documents are fetched. - b. Encrypted documents are decrypted. - c. The longest possible list of decrypted documents are inserted - in the soledad db (this depends on which documents have already - arrived and which documents have already been decrypte, because - the order of insertion in the local soledad db matters). - d. Processed documents are deleted from the database. - - 4. When we have processed as many documents as we should, the loop - finishes. - """ - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx, sync_id" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param source_replica_uid: The source replica uid, used to find the - correct callback for inserting documents. - :type source_replica_uid: str - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - - self._docs_to_process = None - self._processed_docs = 0 - self._last_inserted_idx = 0 - - self._loop = LoopingCall(self._decrypt_and_recurse) - - def _start_pool(self, period): - self._loop.start(period) - - def start(self, docs_to_process): - """ - Set the number of documents we expect to process. - - This should be called by the during the sync exchange process as soon - as we know how many documents are arriving from the server. - - :param docs_to_process: The number of documents to process. - :type docs_to_process: int - """ - SyncEncryptDecryptPool.start(self) - self._decrypted_docs_indexes = set() - self._sync_id = uuid4().hex - self._docs_to_process = docs_to_process - self._deferred = defer.Deferred() - d = self._init_db() - d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) - return d - - def stop(self): - if self._loop.running: - self._loop.stop() - self._finish() - SyncEncryptDecryptPool.stop(self) - - def _init_db(self): - """ - Ensure sync_id column is present then - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % - self.TABLE_NAME) - d = self._runQuery(ensure_sync_id_column) - - def empty_received_docs(_): - query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) - return self._runOperation(query, (self._sync_id,)) - - d.addCallbacks(empty_received_docs, empty_received_docs) - return d - - def _errback(self, failure): - logger.error(failure) - self._deferred.errback(failure) - self._processed_docs = 0 - self._last_inserted_idx = 0 - - @property - def deferred(self): - """ - Deferred that will be fired when the decryption loop has finished - processing all the documents. - """ - return self._deferred - - def insert_encrypted_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Decrypt and insert a received document into local staging area to be - processed later on. - - :param doc_id: The document ID. - :type doc_id: str - :param doc_rev: The document Revision - :param doc_rev: str - :param content: The content of the document - :type content: dict - :param gen: The document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire after the decrypted document has - been inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - decrypt_doc_task, *args) - # callback will insert it for later processing - d.addCallback(self._decrypt_doc_cb) - return d - - def insert_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The document id - :type doc_id: str - :param doc_rev: The document revision - :param doc_rev: str or dict - :param content: The content of the document - :type content: dict - :param gen: The document generation - :type gen: int - :param trans_id: The transaction id - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, - idx, self._sync_id)) - d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) - return d - - def _delete_received_docs(self, doc_ids): - """ - Delete a list of received docs after get them inserted into the db. - - :param doc_id: Document ID list. - :type doc_id: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - placeholders = ', '.join('?' for _ in doc_ids) - query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ - % (self.TABLE_NAME, placeholders) - return self._runOperation(query, (doc_ids)) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted_docs. - - :param result: A tuple containing the document's id, revision, - content, generation, transaction id and sync index. - :type result: tuple(str, str, str, int, str, int) - - :return: A deferred that will fire after the document has been - inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - doc_id, rev, content, gen, trans_id, idx = result - logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _get_docs(self, encrypted=None, sequence=None): - """ - Get documents from the received docs table in the sync db. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - :param order_by: The name of the field to order results. - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ - "idx FROM %s" % self.TABLE_NAME - parameters = [] - if encrypted or sequence: - query += " WHERE sync_id = ? and" - parameters += [self._sync_id] - if encrypted: - query += " encrypted = ?" - parameters += [int(encrypted)] - if sequence: - query += " idx in (" + ', '.join('?' * len(sequence)) + ")" - parameters += [int(i) for i in sequence] - query += " ORDER BY idx ASC" - return self._runQuery(query, parameters) - - @defer.inlineCallbacks - def _get_insertable_docs(self): - """ - Return a list of non-encrypted documents ready to be inserted. - - :return: A deferred that will fire with the list of insertable - documents. - :rtype: twisted.internet.defer.Deferred - """ - # Here, check in memory what are the insertable indexes that can - # form a sequence starting from the last inserted index - sequence = [] - insertable_docs = [] - next_index = self._last_inserted_idx + 1 - while next_index in self._decrypted_docs_indexes: - sequence.append(str(next_index)) - next_index += 1 - if len(sequence) > 900: - # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER - # if we try to query more, SQLite will refuse - # we need to find a way to improve this - # being researched in #7669 - break - # Then fetch all the ones ready for insertion. - if sequence: - insertable_docs = yield self._get_docs(encrypted=False, - sequence=sequence) - defer.returnValue(insertable_docs) - - @defer.inlineCallbacks - def _process_decrypted_docs(self): - """ - Fetch as many decrypted documents as can be taken from the expected - order and insert them in the local replica. - - :return: A deferred that will fire with the list of inserted - documents. - :rtype: twisted.internet.defer.Deferred - """ - insertable = yield self._get_insertable_docs() - processed_docs_ids = [] - for doc_fields in insertable: - method = self._insert_decrypted_local_doc - # FIXME: This is used only because SQLCipherU1DBSync is synchronous - # When adbapi is used there is no need for an external thread - # Without this the reactor can freeze and fail docs download - yield threads.deferToThread(method, *doc_fields) - processed_docs_ids.append(doc_fields[0]) - yield self._delete_received_docs(processed_docs_ids) - - def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, encrypted, idx): - """ - Insert the decrypted document into the local replica. - - Make use of the passed callback `insert_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - logger.debug("sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - self._insert_doc_cb(doc, gen, trans_id) - - # store info about processed docs - self._last_inserted_idx = idx - self._processed_docs += 1 - - @defer.inlineCallbacks - def _decrypt_and_recurse(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - This method implicitelly returns a defferred (see the decorator - above). It should only be called by _launch_decrypt_and_process(). - because this way any exceptions raised here will be stored by the - errback attached to the deferred returned. - - :return: A deferred which will fire after all decrypt, process and - delete operations have been executed. - :rtype: twisted.internet.defer.Deferred - """ - if not self.running: - defer.returnValue(None) - processed = self._processed_docs - pending = self._docs_to_process - - if processed < pending: - yield self._process_decrypted_docs() - else: - self._finish() - - def _finish(self): - self._processed_docs = 0 - self._last_inserted_idx = 0 - self._decrypted_docs_indexes = set() - if not self._deferred.called: - self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py index 4fc91d9d..92bc85d6 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -58,6 +58,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts) def createDoc(doc): return dbpool.runU1DBQuery("create_doc", doc) + db_indexes = { 'by-chash': ['chash'], 'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_): deferreds.append(d) return defer.gatherResults(deferreds, consumeErrors=True) + d = create_indexes(None) d.addCallback(insert_docs) d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py index 38ea18a3..429566c7 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -58,6 +58,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] num docs", numdocs) @@ -74,6 +75,7 @@ dbpool = adbapi.getConnectionPool(opts) def createDoc(doc, doc_id): return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id) + db_indexes = { 'by-chash': ['chash'], 'by-number': ['number']} @@ -168,6 +170,7 @@ def insert_docs(_): deferreds.append(d) return defer.gatherResults(deferreds, consumeErrors=True) + d = create_indexes(None) d.addCallback(insert_docs) d.addCallback(get_from_index) diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py index 61621e89..ddedf433 100644 --- a/client/src/leap/soledad/client/examples/run_benchmark.py +++ b/client/src/leap/soledad/client/examples/run_benchmark.py @@ -14,6 +14,7 @@ cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py" def parse_time(r): return r.split('\n')[-1] + with open(CSVFILE, 'w') as log: for times in range(0, 10000, 500): diff --git a/client/src/leap/soledad/client/examples/soledad_sync.py b/client/src/leap/soledad/client/examples/soledad_sync.py index 63077ee3..3aed10eb 100644 --- a/client/src/leap/soledad/client/examples/soledad_sync.py +++ b/client/src/leap/soledad/client/examples/soledad_sync.py @@ -40,7 +40,7 @@ def init_soledad(_): global soledad soledad = Soledad(uuid, _pass, secrets_path, local_db_path, server_url, cert_file, - auth_token=token, defer_encryption=False) + auth_token=token) def getall(_): d = soledad.get_all_docs() diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py index a2683836..39301b41 100644 --- a/client/src/leap/soledad/client/examples/use_adbapi.py +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -39,6 +39,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] times", times) @@ -87,6 +88,7 @@ def allDone(_): print((end_time - start_time).total_seconds()) reactor.stop() + deferreds = [] payload = open('manifest.phk').read() diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py index e2501c98..db77c4b3 100644 --- a/client/src/leap/soledad/client/examples/use_api.py +++ b/client/src/leap/soledad/client/examples/use_api.py @@ -36,6 +36,7 @@ def debug(*args): if not silent: print(*args) + debug("[+] db path:", tmpdb) debug("[+] times", times) @@ -52,6 +53,7 @@ db = sqlcipher.SQLCipherDatabase(opts) def allDone(): debug("ALL DONE!") + payload = open('manifest.phk').read() for i in range(times): diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 62e8bcf0..0e250bf1 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -25,10 +25,13 @@ after receiving. import os from leap.soledad.common.log import getLogger -from leap.common.http import HTTPClient +from leap.common.certs import get_compatible_ssl_context_factory +from twisted.web.client import Agent +from twisted.internet import reactor from leap.soledad.client.http_target.send import HTTPDocSender from leap.soledad.client.http_target.api import SyncTargetAPI from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto logger = getLogger(__name__) @@ -51,8 +54,7 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): the parsed documents that the remote send us, before being decrypted and written to the main database. """ - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): + def __init__(self, url, source_replica_uid, creds, crypto, cert_file): """ Initialize the sync target. @@ -65,21 +67,11 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): :type creds: creds :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto + :type crypto: soledad._crypto.SoledadCrypto :param cert_file: Path to the certificate of the ca used to validate the SSL certificate used by the remote soledad server. :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool """ if url.endswith("/"): url = url[:-1] @@ -89,17 +81,13 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): self._uuid = None self.set_creds(creds) self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool + # TODO: DEPRECATED CRYPTO + self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret) self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - - # XXX Increasing timeout of simple requests to avoid chances of hitting - # the duplicated syncing bug. This could be reduced to the 30s default - # after implementing Cancellable Sync. See #7382 - self._http = HTTPClient(cert_file, timeout=90) + + # Twisted default Agent with our own ssl context factory + self._http = Agent(reactor, + get_compatible_ssl_context_factory(cert_file)) if DO_STATS: self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..1b086a00 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -18,10 +18,13 @@ import os import json import base64 +from StringIO import StringIO from uuid import uuid4 from twisted.web.error import Error from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer from leap.soledad.client.http_target.support import readBody from leap.soledad.common.errors import InvalidAuthTokenError @@ -39,14 +42,6 @@ class SyncTargetAPI(SyncTarget): Declares public methods and implements u1db.SyncTarget. """ - @defer.inlineCallbacks - def close(self): - if self._sync_enc_pool: - self._sync_enc_pool.stop() - if self._sync_decr_pool: - self._sync_decr_pool.stop() - yield self._http.close() - @property def uuid(self): return self._uuid @@ -69,16 +64,20 @@ class SyncTargetAPI(SyncTarget): def _base_header(self): return self._auth_header.copy() if self._auth_header else {} - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None): + content_type=None, body_reader=readBody, + body_producer=None): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, readBody) + if not body_producer and body: + body = FileBodyProducer(StringIO(body)) + elif body_producer: + # Upload case, check send.py + body = body_producer(body) + d = self._http.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(body_reader) d.addErrback(_unauth_to_invalid_token_error) return d @@ -153,7 +152,7 @@ class SyncTargetAPI(SyncTarget): def sync_exchange(self, docs_by_generation, source_replica_uid, last_known_generation, last_known_trans_id, insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. After that, receive documents from the remote @@ -185,11 +184,6 @@ class SyncTargetAPI(SyncTarget): created. :type ensure_callback: function - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which fires with the new generation and transaction id of the target replica. :rtype: twisted.internet.defer.Deferred @@ -221,8 +215,7 @@ class SyncTargetAPI(SyncTarget): cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) + ensure_callback, sync_id) # update gen and trans id info in case we just sent and did not # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 184c5883..8676ceed 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -15,18 +15,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json - from twisted.internet import defer +from twisted.internet import threads from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted from leap.soledad.common.document import SoledadDocument from leap.soledad.common.l2db import errors -from leap.soledad.common.l2db.remote import utils +from leap.soledad.client import crypto as old_crypto + +from . import fetch_protocol logger = getLogger(__name__) @@ -50,208 +51,105 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - + ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - doc = yield self._receive_one_doc( + # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes + self.semaphore = defer.DeferredSemaphore(1) + + metadata = yield self._fetch_all( last_known_generation, last_known_trans_id, - sync_id, 0) - self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + + # wait for pending inserts + yield self.semaphore.acquire() if ngen: new_generation = ngen new_transaction_id = ntrans - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - defer.returnValue([new_generation, new_transaction_id]) - def _receive_one_doc(self, last_known_generation, - last_known_trans_id, sync_id, received): + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id): # add remote replica metadata to the request body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - body.insert_info(received=received) - # send headers + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream return self._http_request( self._url, method='POST', body=str(body), - content_type='application/x-soledad-sync-get') + content_type='application/x-soledad-sync-get', + body_reader=body_reader) - def _insert_received_doc(self, response, idx, total): + @defer.inlineCallbacks + def _doc_parser(self, doc_info, content, total): """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str :param total: The total number of operations. :type total: int """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) - if self._sync_decr_pool and not self._sync_decr_pool.running: - self._sync_decr_pool.start(number_of_changes) - - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): + doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(content): + content = yield self._crypto.decrypt_doc(doc) + elif old_crypto.is_symmetrically_encrypted(doc): + content = self._deprecated_crypto.decrypt_doc(doc) + doc.set_json(content) + + # TODO insert blobs here on the blob backend + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} - _emit_receive_status(user_data, self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id + _emit_receive_status(user_data, self._received_docs, total=total) - def _parse_received_doc_response(self, response): + def _parse_metadata(self, metadata): """ - Parse the response from the server containing the received document. + Parse the response from the server containing the sync metadata. - :param response: The body and headers of the response. - :type response: tuple(str, dict) + :param response: Metadata as string + :type response: str - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) + :return: (number_of_changes, new_gen, new_trans_id) :rtype: tuple """ - # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] + metadata = json.loads(metadata) + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) except (ValueError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) + raise errors.BrokenSyncStream('Metadata parsing failed') def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..fa6b1969 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from functools import partial +from cStringIO import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self.metadata = '' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + try: + if reason.check(ResponseDone): + self.dataBuffer = self.metadata + else: + self.dataBuffer = self.finish() + except errors.BrokenSyncStream, e: + return self.deferred.errback(e) + return ReadBodyProtocol.connectionLost(self, reason) + + def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + self.lineReceived(line) + self._line += 1 + + def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ + if self._properly_finished: + raise errors.BrokenSyncStream("Reading a finished stream") + if ']' == line: + self._properly_finished = True + elif self._line == 0: + if line is not '[': + raise errors.BrokenSyncStream("Invalid start") + elif self._line == 1: + self.metadata = line + if 'error' in self.metadata: + raise errors.BrokenSyncStream("Error from server: %s" % line) + self.total = json.loads(line).get('number_of_changes', -1) + elif (self._line % 2) == 0: + self.current_doc = json.loads(line) + if 'error' in self.current_doc: + raise errors.BrokenSyncStream("Error from server: %s" % line) + else: + d = self._doc_reader( + self.current_doc, line.strip() or None, self.total) + d.addErrback(self._error) + + def _error(self, reason): + logger.error(reason) + self.transport.loseConnection() + + def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ + if not self._properly_finished: + raise errors.BrokenSyncStream('Stream not properly closed') + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type doc_reader: function + + @return: A function that can be called by the http Agent to create and + configure the proper protocol. + """ + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index c7bd057e..2b286ec5 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -22,6 +22,7 @@ from leap.soledad.common.log import getLogger from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer logger = getLogger(__name__) @@ -32,8 +33,6 @@ class HTTPDocSender(object): They need to be encrypted and metadata prepared before sending. """ - MAX_BATCH_SIZE = 0 # disabled by now, this is being tested yet - # The uuid of the local replica. # Any class inheriting from this one should provide a meaningful attribute # if the sync status event is meant to be used somewhere else. @@ -54,73 +53,50 @@ class HTTPDocSender(object): last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - while body.consumed < total: - result = yield self._send_batch(total, body, docs_by_generation) + result = yield self._send_batch(body, docs_by_generation) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) - def _delete_sent(self, docs): - for doc, gen, trans_id in docs: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - @defer.inlineCallbacks - def _send_batch(self, total, body, docs): - sent = [] - missing = total - body.consumed - for i in xrange(1, missing + 1): - if body.pending_size > self.MAX_BATCH_SIZE: - break - idx = body.consumed + i - entry = docs[idx - 1] - sent.append(entry) - yield self._prepare_one_doc(entry, body, idx, total) - result = yield self._send_request(body.pop()) - if self._defer_encryption: - self._delete_sent(sent) - + def _send_batch(self, body, docs): + total, calls = len(docs), [] + for i, entry in enumerate(docs): + calls.append((self._prepare_one_doc, + entry, body, i + 1, total)) + result = yield self._send_request(body, calls) _emit_send_status(self.uuid, body.consumed, total) + defer.returnValue(result) - def _send_request(self, body): + def _send_request(self, body, calls): return self._http_request( self._url, method='POST', - body=body, - content_type='application/x-soledad-sync-put') + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) @defer.inlineCallbacks def _prepare_one_doc(self, entry, body, idx, total): - doc, gen, trans_id = entry - content = yield self._encrypt_doc(doc) + get_doc_call, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc_call) body.insert_info( id=doc.doc_id, rev=doc.rev, content=content, gen=gen, trans_id=trans_id, number_of_docs=total, doc_idx=idx) + _emit_send_status(self.uuid, body.consumed, total) - def _encrypt_doc(self, doc): - d = None + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc_call): + f, args, kwargs = get_doc_call + doc = yield f(*args, **kwargs) if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) + defer.returnValue((doc, None)) else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d + content = yield self._crypto.encrypt_doc(doc) + defer.returnValue((doc, content)) def _emit_send_status(user_data, idx, total): diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..0cb6d039 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, producer): + """ + Initialize the string produer. + + :param producer: A RequestBody instance and a list of producer calls + :type producer: (.support.RequestBody, [(function, *args)]) + """ + self.body, self.producer = producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.001) + continue + call = self.producer.pop(0) + fun, args = call[0], call[1:] + yield fun(*args) + consumer.write(self.body.pop(1, leave_open=True)) + consumer.write(self.body.pop(0)) # close stream + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 6ec98ed4..d8d8e420 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -53,6 +53,9 @@ class ReadBodyProtocol(_ReadBodyProtocol): if exc_cls is not None: message = respdic.get("message") self.deferred.errback(exc_cls(message)) + else: + self.deferred.errback( + errors.HTTPError(self.status, respdic, self.headers)) # ---8<--- end of snippet from u1db.remote.http_client def connectionLost(self, reason): @@ -91,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol): self.deferred.errback(reason) -def readBody(response): +def readBody(response, protocolClass=ReadBodyProtocol): """ Get the body of an L{IResponse} and return it as a byte string. @@ -116,7 +119,7 @@ def readBody(response): abort() d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) + protocol = protocolClass(response, d) def getAbort(): return getattr(protocol.transport, 'abortConnection', None) @@ -155,46 +158,49 @@ class RequestBody(object): self.headers = header_dict self.entries = [] self.consumed = 0 - self.pending_size = 0 def insert_info(self, **entry_dict): """ Dumps an entry into JSON format and add it to entries list. + Adds 'content' key on a new line if it's present. :param entry_dict: Entry as a dictionary :type entry_dict: dict - - :return: length of the entry after JSON dumps - :rtype: int """ - entry = json.dumps(entry_dict) + content = '' + if 'content' in entry_dict: + content = ',\r\n' + (entry_dict['content'] or '') + entry = json.dumps(entry_dict) + content self.entries.append(entry) - self.pending_size += len(entry) - def pop(self): + def pop(self, amount=10, leave_open=False): """ - Removes all entries and returns it formatted and ready + Removes entries and returns it formatted and ready to be sent. - :param number: number of entries to pop and format - :type number: int + :param amount: number of entries to pop and format + :type amount: int + + :param leave_open: flag to skip stream closing + :type amount: bool :return: formatted body ready to be sent :rtype: str """ - entries = self.entries[:] - self.entries = [] - self.pending_size = 0 - self.consumed += len(entries) - return self.entries_to_str(entries) + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 if not leave_open else False + return self.entries_to_str(entries, start, end) def __str__(self): - return self.entries_to_str(self.entries) + return self.pop(len(self.entries)) def __len__(self): return len(self.entries) - def entries_to_str(self, entries=None): + def entries_to_str(self, entries=None, start=True, end=True): """ Format a list of entries into the body format expected by the server. @@ -205,6 +211,10 @@ class RequestBody(object): :return: formatted body ready to be sent :rtype: str """ - data = '[\r\n' + json.dumps(self.headers) + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) data += ''.join(',\r\n' + entry for entry in entries) - return data + '\r\n]' + if end: + data += '\r\n]' + return data diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface): "Property, True if the syncer is syncing.") token = Attribute("The authentication Token.") - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface): :param url: the url of the target replica to sync with :type url: str - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool - :return: A deferred that will fire with the local generation before the synchronisation was performed. diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 1eb6f31d..3fe98c64 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -34,7 +34,7 @@ from leap.soledad.common import soledad_assert_type from leap.soledad.common import document from leap.soledad.common.log import getLogger from leap.soledad.client import events -from leap.soledad.client.crypto import encrypt_sym, decrypt_sym +from leap.soledad.client import _crypto logger = getLogger(__name__) @@ -126,7 +126,7 @@ class SoledadSecrets(object): instantiates Soledad. """ - IV_SEPARATOR = ":" + SEPARATOR = ":" """ A separator used for storing the encryption initial value prepended to the ciphertext. @@ -142,7 +142,8 @@ class SoledadSecrets(object): KDF_SALT_KEY = 'kdf_salt' KDF_LENGTH_KEY = 'kdf_length' KDF_SCRYPT = 'scrypt' - CIPHER_AES256 = 'aes256' + CIPHER_AES256 = 'aes256' # deprecated, AES-CTR + CIPHER_AES256_GCM = _crypto.ENC_METHOD.aes_256_gcm RECOVERY_DOC_VERSION_KEY = 'version' RECOVERY_DOC_VERSION = 1 """ @@ -343,7 +344,7 @@ class SoledadSecrets(object): '%s%s' % (self._passphrase_as_string(), self._uuid)).hexdigest() - def _export_recovery_document(self): + def _export_recovery_document(self, cipher=None): """ Export the storage secrets. @@ -364,6 +365,9 @@ class SoledadSecrets(object): Note that multiple storage secrets might be stored in one recovery document. + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str + :return: The recovery document. :rtype: dict """ @@ -371,7 +375,7 @@ class SoledadSecrets(object): encrypted_secrets = {} for secret_id in self._secrets: encrypted_secrets[secret_id] = self._encrypt_storage_secret( - self._secrets[secret_id]) + self._secrets[secret_id], doc_cipher=cipher) # create the recovery document data = { self.STORAGE_SECRETS_KEY: encrypted_secrets, @@ -447,6 +451,7 @@ class SoledadSecrets(object): except SecretsException as e: logger.error("failed to decrypt storage secret: %s" % str(e)) + raise e return secret_count, active_secret def _get_secrets_from_shared_db(self): @@ -537,18 +542,25 @@ class SoledadSecrets(object): ) if encrypted_secret_dict[self.KDF_LENGTH_KEY] != len(key): raise SecretsException("Wrong length of decryption key.") - if encrypted_secret_dict[self.CIPHER_KEY] != self.CIPHER_AES256: + supported_ciphers = [self.CIPHER_AES256, self.CIPHER_AES256_GCM] + doc_cipher = encrypted_secret_dict[self.CIPHER_KEY] + if doc_cipher not in supported_ciphers: raise SecretsException("Unknown cipher in stored secret.") # recover the initial value and ciphertext iv, ciphertext = encrypted_secret_dict[self.SECRET_KEY].split( - self.IV_SEPARATOR, 1) + self.SEPARATOR, 1) ciphertext = binascii.a2b_base64(ciphertext) - decrypted_secret = decrypt_sym(ciphertext, key, iv) + try: + decrypted_secret = _crypto.decrypt_sym( + ciphertext, key, iv, doc_cipher) + except Exception as e: + logger.error(e) + raise SecretsException("Unable to decrypt secret.") if encrypted_secret_dict[self.LENGTH_KEY] != len(decrypted_secret): raise SecretsException("Wrong length of decrypted secret.") return decrypted_secret - def _encrypt_storage_secret(self, decrypted_secret): + def _encrypt_storage_secret(self, decrypted_secret, doc_cipher=None): """ Encrypt the storage secret. @@ -567,6 +579,8 @@ class SoledadSecrets(object): :param decrypted_secret: The decrypted storage secret. :type decrypted_secret: str + :param cipher: (Optional) The ciper to use. Defaults to AES256 + :type cipher: str :return: The encrypted storage secret. :rtype: dict @@ -575,17 +589,18 @@ class SoledadSecrets(object): salt = os.urandom(self.SALT_LENGTH) # get a 256-bit key key = scrypt.hash(self._passphrase_as_string(), salt, buflen=32) - iv, ciphertext = encrypt_sym(decrypted_secret, key) + doc_cipher = doc_cipher or self.CIPHER_AES256_GCM + iv, ciphertext = _crypto.encrypt_sym(decrypted_secret, key, doc_cipher) + ciphertext = binascii.b2a_base64(ciphertext) encrypted_secret_dict = { # leap.soledad.crypto submodule uses AES256 for symmetric # encryption. self.KDF_KEY: self.KDF_SCRYPT, self.KDF_SALT_KEY: binascii.b2a_base64(salt), self.KDF_LENGTH_KEY: len(key), - self.CIPHER_KEY: self.CIPHER_AES256, + self.CIPHER_KEY: doc_cipher, self.LENGTH_KEY: len(decrypted_secret), - self.SECRET_KEY: '%s%s%s' % ( - str(iv), self.IV_SEPARATOR, binascii.b2a_base64(ciphertext)), + self.SECRET_KEY: self.SEPARATOR.join([str(iv), ciphertext]) } return encrypted_secret_dict diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 3921c323..c9a9444e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -42,9 +42,7 @@ SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases handled by Soledad should be created by SQLCipher >= 2.0. """ import os -import json -from hashlib import sha256 from functools import partial from pysqlcipher import dbapi2 as sqlcipher_dbapi2 @@ -117,7 +115,7 @@ class SQLCipherOptions(object): @classmethod def copy(cls, source, path=None, key=None, create=None, is_raw_key=None, cipher=None, kdf_iter=None, - cipher_page_size=None, defer_encryption=None, sync_db_key=None): + cipher_page_size=None, sync_db_key=None): """ Return a copy of C{source} with parameters different than None replaced by new values. @@ -134,7 +132,7 @@ class SQLCipherOptions(object): args.append(getattr(source, name)) for name in ["create", "is_raw_key", "cipher", "kdf_iter", - "cipher_page_size", "defer_encryption", "sync_db_key"]: + "cipher_page_size", "sync_db_key"]: val = local_vars[name] if val is not None: kwargs[name] = val @@ -145,7 +143,7 @@ class SQLCipherOptions(object): def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, - defer_encryption=False, sync_db_key=None): + sync_db_key=None): """ :param path: The filesystem path for the database to open. :type path: str @@ -163,10 +161,6 @@ class SQLCipherOptions(object): :type kdf_iter: int :param cipher_page_size: The page size. :type cipher_page_size: int - :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it - inline while syncing. - :type defer_encryption: bool """ self.path = path self.key = key @@ -175,7 +169,6 @@ class SQLCipherOptions(object): self.cipher = cipher self.kdf_iter = kdf_iter self.cipher_page_size = cipher_page_size - self.defer_encryption = defer_encryption self.sync_db_key = sync_db_key def __str__(self): @@ -201,7 +194,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ A U1DB implementation that uses SQLCipher as its persistence layer. """ - defer_encryption = False # The attribute _index_storage_value will be used as the lookup key for the # implementation of the SQLCipher storage backend. @@ -225,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ # ensure the db is encrypted if the file already exists if os.path.isfile(opts.path): - _assert_db_is_encrypted(opts) - - # connect to the sqlcipher database - self._db_handle = initialize_sqlcipher_db(opts) + self._db_handle = _assert_db_is_encrypted(opts) + else: + # connect to the sqlcipher database + self._db_handle = initialize_sqlcipher_db(opts) # TODO --------------------------------------------------- # Everything else in this initialization has to be factored @@ -267,27 +259,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') # - # Document operations - # - - def put_doc(self, doc): - """ - Overwrite the put_doc method, to enqueue the modified document for - encryption before sync. - - :param doc: The document to be put. - :type doc: u1db.Document - - :return: The new document revision. - :rtype: str - """ - doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - if self.defer_encryption: - # TODO move to api? - self._sync_enc_pool.encrypt_doc(doc) - return doc_rev - - # # SQLCipher API methods # @@ -425,25 +396,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ ENCRYPT_LOOP_PERIOD = 1 - def __init__(self, opts, soledad_crypto, replica_uid, cert_file, - defer_encryption=False, sync_db=None, sync_enc_pool=None): + def __init__(self, opts, soledad_crypto, replica_uid, cert_file): self._opts = opts self._path = opts.path self._crypto = soledad_crypto self.__replica_uid = replica_uid self._cert_file = cert_file - self._sync_enc_pool = sync_enc_pool - - self._sync_db = sync_db - # we store syncers in a dictionary indexed by the target URL. We also - # store a hash of the auth info in case auth info expires and we need - # to rebuild the syncer for that target. The final self._syncers - # format is the following: - # - # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} - self._syncers = {} # storage for the documents received during a sync self.received_docs = [] @@ -458,6 +418,9 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if DO_STATS: self.sync_phase = None + def commit(self): + self._db_handle.commit() + @property def _replica_uid(self): return str(self.__replica_uid) @@ -477,7 +440,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): raise DatabaseAccessError(str(e)) @defer.inlineCallbacks - def sync(self, url, creds=None, defer_decryption=True): + def sync(self, url, creds=None): """ Synchronize documents with remote replica exposed at url. @@ -492,10 +455,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param creds: optional dictionary giving credentials to authorize the operation with the server. :type creds: dict - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool :return: A Deferred, that will fire with the local generation (type `int`) @@ -507,8 +466,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.sync_phase = syncer.sync_phase self.syncer = syncer self.sync_exchange_phase = syncer.sync_exchange_phase - local_gen_before_sync = yield syncer.sync( - defer_decryption=defer_decryption) + local_gen_before_sync = yield syncer.sync() self.received_docs = syncer.received_docs defer.returnValue(local_gen_before_sync) @@ -525,29 +483,15 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :return: A synchronizer. :rtype: Synchronizer """ - # we want to store at most one syncer for each url, so we also store a - # hash of the connection credentials and replace the stored syncer for - # a certain url if credentials have changed. - h = sha256(json.dumps([url, creds])).hexdigest() - cur_h, syncer = self._syncers.get(url, (None, None)) - if syncer is None or h != cur_h: - syncer = SoledadSynchronizer( - self, - SoledadHTTPSyncTarget( - url, - # XXX is the replica_uid ready? - self._replica_uid, - creds=creds, - crypto=self._crypto, - cert_file=self._cert_file, - sync_db=self._sync_db, - sync_enc_pool=self._sync_enc_pool)) - self._syncers[url] = (h, syncer) - # in order to reuse the same synchronizer multiple times we have to - # reset its state (i.e. the number of documents received from target - # and inserted in the local replica). - syncer.num_inserted = 0 - return syncer + return SoledadSynchronizer( + self, + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + cert_file=self._cert_file)) # # Symmetric encryption of syncing docs @@ -558,18 +502,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # XXX this SHOULD BE a callback return self._get_generation() - def close(self): - """ - Close the syncer and syncdb orderly - """ - super(SQLCipherU1DBSync, self).close() - # close all open syncers - for url in self._syncers.keys(): - _, syncer = self._syncers[url] - syncer.close() - del self._syncers[url] - self.running = False - class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): """ @@ -599,14 +531,12 @@ class SoledadSQLCipherWrapper(SQLCipherDatabase): It can be used from adbapi to initialize a soledad database after getting a regular connection to a sqlcipher database. """ - def __init__(self, conn, opts, sync_enc_pool): + def __init__(self, conn, opts): self._db_handle = conn self._real_replica_uid = None self._ensure_schema() self.set_document_factory(soledad_doc_factory) self._prime_replica_uid() - self.defer_encryption = opts.defer_encryption - self._sync_enc_pool = sync_enc_pool def _assert_db_is_encrypted(opts): @@ -635,7 +565,7 @@ def _assert_db_is_encrypted(opts): # assert that we can access it using SQLCipher with the given # key dummy_query = ('SELECT count(*) FROM sqlite_master',) - initialize_sqlcipher_db(opts, on_init=dummy_query) + return initialize_sqlcipher_db(opts, on_init=dummy_query) else: raise DatabaseIsNotEncrypted() @@ -660,6 +590,7 @@ def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, return SoledadDocument(doc_id=doc_id, rev=rev, json=json, has_conflicts=has_conflicts, syncable=syncable) + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 7ed5f693..70c841d6 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer): self.sync_exchange_phase = None @defer.inlineCallbacks - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents between source and target. - Differently from u1db `Synchronizer.sync` method, this one allows to - pass a `defer_decryption` flag that will postpone the last - step in the synchronization dance, namely, the setting of the last - known generation and transaction id for a given remote replica. - - This is done to allow the ongoing parallel decryption of the incoming - docs to proceed without `InvalidGeneration` conflicts. - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which will fire after the sync has finished with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -154,28 +141,19 @@ class SoledadSynchronizer(Synchronizer): self.sync_phase[0] += 1 # -------------------------------------------------------------------- - # prepare to send all the changed docs - changed_doc_ids = [doc_id for doc_id, _, _ in changes] - docs_to_send = self.source.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) - ids_sent = [] - docs_by_generation = [] - idx = 0 - for doc in docs_to_send: - _, gen, trans = changes[idx] - docs_by_generation.append((doc, gen, trans)) - idx += 1 - ids_sent.append(doc.doc_id) + docs_by_generation = self._docs_by_gen_from_changes(changes) # exchange documents and try to insert the returned ones with # the target, return target synced-up-to gen. new_gen, new_trans_id = yield sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) + self._insert_doc_from_target, ensure_callback=ensure_callback) + ids_sent = [doc_id for doc_id, _, _ in changes] logger.debug("target gen after sync: %d" % new_gen) logger.debug("target trans_id after sync: %s" % new_trans_id) + if hasattr(self.source, 'commit'): # sqlcipher backend speed up + self.source.commit() # insert it all in a single transaction info = { "target_replica_uid": self.target_replica_uid, "new_gen": new_gen, @@ -204,6 +182,14 @@ class SoledadSynchronizer(Synchronizer): defer.returnValue(my_gen) + def _docs_by_gen_from_changes(self, changes): + docs_by_generation = [] + kwargs = {'include_deleted': True} + for doc_id, gen, trans in changes: + get_doc = (self.source.get_doc, (doc_id,), kwargs) + docs_by_generation.append((get_doc, gen, trans)) + return docs_by_generation + def complete_sync(self): """ Last stage of the synchronization: @@ -225,12 +211,6 @@ class SoledadSynchronizer(Synchronizer): # if gapless record current reached generation with target return self._record_sync_info_with_the_target(info["my_gen"]) - def close(self): - """ - Close the synchronizer. - """ - self.sync_target.close() - def _record_sync_info_with_the_target(self, start_generation): """ Store local replica metadata in server. |