diff options
Diffstat (limited to 'src/leap/soledad/client')
43 files changed, 8262 insertions, 0 deletions
diff --git a/src/leap/soledad/client/__init__.py b/src/leap/soledad/client/__init__.py new file mode 100644 index 00000000..bcad78db --- /dev/null +++ b/src/leap/soledad/client/__init__.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad - Synchronization Of Locally Encrypted Data Among Devices. +""" +from leap.soledad.common import soledad_assert + +from .api import Soledad +from ._document import Document, AttachmentStates +from ._version import get_versions + +__version__ = get_versions()['version'] +del get_versions + +__all__ = ['soledad_assert', 'Soledad', 'Document', 'AttachmentStates', + '__version__'] diff --git a/src/leap/soledad/client/_crypto.py b/src/leap/soledad/client/_crypto.py new file mode 100644 index 00000000..8cedf52e --- /dev/null +++ b/src/leap/soledad/client/_crypto.py @@ -0,0 +1,557 @@ +# -*- coding: utf-8 -*- +# _crypto.py +# Copyright (C) 2016 LEAP Encryption Access Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Cryptographic operations for the soledad client. + +This module implements streaming crypto operations. +It replaces the old client.crypto module, that will be deprecated in soledad +0.12. + +The algorithm for encrypting and decrypting is as follow: + +The KEY is a 32 bytes value. +The IV is a random 16 bytes value. +The PREAMBLE is a packed_structure with encryption metadata, such as IV. +The SEPARATOR is a space. + +Encryption +---------- + +IV = os.urandom(16) +PREAMBLE = BLOB_SIGNATURE_MAGIC, ENC_SCHEME, ENC_METHOD, time, IV, doc_id, rev, +and size. + +PREAMBLE = base64_encoded(PREAMBLE) +CIPHERTEXT = base64_encoded(AES_GCM(KEY, cleartext) + resulting_tag) if armor + +CIPHERTEXT = AES_GCM(KEY, cleartext) + resulting_tag if not armor +# "resulting_tag" came from AES-GCM encryption. It will be the last 16 bytes of +# our ciphertext. + +encrypted_payload = PREAMBLE + SEPARATOR + CIPHERTEXT + +Decryption +---------- + +Ciphertext and Tag CAN come encoded in base64 (with armor=True) or raw (with +armor=False). Preamble will always come encoded in base64. + +PREAMBLE, CIPHERTEXT = PAYLOAD.SPLIT(' ', 1) + +PREAMBLE = base64_decode(PREAMBLE) +CIPHERTEXT = base64_decode(CIPHERTEXT) if armor else CIPHERTEXT + +CIPHERTEXT, TAG = CIPHERTEXT[:-16], CIPHERTEXT[-16:] +CLEARTEXT = aes_gcm_decrypt(KEY, IV, CIPHERTEXT, TAG, associated_data=PREAMBLE) + +AES-GCM will check preamble authenticity as well, since we are using +Authenticated Encryption with Associated Data (AEAD). Ciphertext and associated +data (PREAMBLE) authenticity will both be checked together during decryption. +PREAMBLE consistency (if it matches the desired document, for instance) is +checked during PREAMBLE reading. +""" + + +import base64 +import hashlib +import warnings +import hmac +import os +import struct +import time + +from io import BytesIO +from collections import namedtuple + +from twisted.internet import defer +from twisted.internet import interfaces +from twisted.web.client import FileBodyProducer + +from leap.soledad.common import soledad_assert +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend + +from zope.interface import implementer + + +SECRET_LENGTH = 64 +SEPARATOR = ' ' # Anything that doesn't belong to base64 encoding + +CRYPTO_BACKEND = default_backend() + +PACMAN = struct.Struct('2sbbQ16s255p255pQ') +LEGACY_PACMAN = struct.Struct('2sbbQ16s255p255p') +BLOB_SIGNATURE_MAGIC = '\x13\x37' + + +ENC_SCHEME = namedtuple('SCHEME', 'symkey')(1) +ENC_METHOD = namedtuple('METHOD', 'aes_256_ctr aes_256_gcm')(1, 2) +DocInfo = namedtuple('DocInfo', 'doc_id rev') + + +class EncryptionDecryptionError(Exception): + pass + + +class InvalidBlob(Exception): + pass + + +class SoledadCrypto(object): + """ + This class provides convenient methods for document encryption and + decryption using BlobEncryptor and BlobDecryptor classes. + """ + def __init__(self, secret): + """ + Initialize the crypto object. + + :param secret: The Soledad remote storage secret. + :type secret: str + """ + self.secret = secret + + def encrypt_doc(self, doc): + """ + Creates and configures a BlobEncryptor, asking it to start encryption + and wrapping the result as a simple JSON string with a "raw" key. + + :param doc: the document to be encrypted. + :type doc: Document + :return: A deferred whose callback will be invoked with a JSON string + containing the ciphertext as the value of "raw" key. + :rtype: twisted.internet.defer.Deferred + """ + + def put_raw(blob): + raw = blob.getvalue() + return '{"raw": "' + raw + '"}' + + content = BytesIO(str(doc.get_json())) + info = DocInfo(doc.doc_id, doc.rev) + del doc + encryptor = BlobEncryptor(info, content, secret=self.secret) + d = encryptor.encrypt() + d.addCallback(put_raw) + return d + + def decrypt_doc(self, doc): + """ + Creates and configures a BlobDecryptor, asking it decrypt and returning + the decrypted cleartext content from the encrypted document. + + :param doc: the document to be decrypted. + :type doc: Document + :return: The decrypted cleartext content of the document. + :rtype: str + """ + info = DocInfo(doc.doc_id, doc.rev) + ciphertext = BytesIO() + payload = doc.content['raw'] + del doc + ciphertext.write(str(payload)) + decryptor = BlobDecryptor(info, ciphertext, secret=self.secret) + return decryptor.decrypt() + + +def encrypt_sym(data, key, method=ENC_METHOD.aes_256_gcm): + """ + Encrypt data using AES-256 cipher in selected mode. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt data (must be 256 bits long). + :type key: str + + :return: A tuple with the initialization vector and the ciphertext, both + encoded as base64. + :rtype: (str, str) + """ + mode = _mode_by_method(method) + encryptor = AESWriter(key, mode=mode) + encryptor.write(data) + _, ciphertext = encryptor.end() + iv = base64.b64encode(encryptor.iv) + tag = encryptor.tag or '' + return iv, ciphertext + tag + + +def decrypt_sym(data, key, iv, method=ENC_METHOD.aes_256_gcm): + """ + Decrypt data using AES-256 cipher in selected mode. + + :param data: The data to be decrypted. + :type data: str + :param key: The symmetric key used to decrypt data (must be 256 bits + long). + :type key: str + :param iv: The base64 encoded initialization vector. + :type iv: str + + :return: The decrypted data. + :rtype: str + """ + _iv = base64.b64decode(str(iv)) + mode = _mode_by_method(method) + tag = None + if mode == modes.GCM: + data, tag = data[:-16], data[-16:] + decryptor = AESWriter(key, _iv, tag=tag, mode=mode) + decryptor.write(data) + _, plaintext = decryptor.end() + return plaintext + + +# TODO maybe rename this to Encryptor, since it will be used by blobs an non +# blobs in soledad. +class BlobEncryptor(object): + """ + Produces encrypted data from the cleartext data associated with a given + Document using AES-256 cipher in GCM mode. + + The production happens using a Twisted's FileBodyProducer, which uses a + Cooperator to schedule calls and can be paused/resumed. Each call takes at + most 65536 bytes from the input. + + Both the production input and output are file descriptors, so they can be + applied to a stream of data. + """ + # TODO + # This class needs further work to allow for proper streaming. + # Right now we HAVE TO WAIT until the end of the stream before encoding the + # result. It should be possible to do that just encoding the chunks and + # passing them to a sink, but for that we have to encode the chunks at + # proper alignment (3 bytes?) with b64 if armor is defined. + + def __init__(self, doc_info, content_fd, secret=None, armor=True, + sink=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + self.armor = armor + + self._content_fd = content_fd + self._content_size = self._get_rounded_size(content_fd) + self._producer = FileBodyProducer(content_fd, readSize=2**16) + + self.sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self._aes = AESWriter(self.sym_key, _buffer=sink) + self._aes.authenticate(self._encode_preamble()) + + def _get_rounded_size(self, fd): + """ + Returns a rounded value in order to minimize information leaks due to + the original size being exposed. + """ + fd.seek(0, os.SEEK_END) + size = _ceiling(fd.tell()) + fd.seek(0) + return size + + @property + def iv(self): + return self._aes.iv + + @property + def tag(self): + return self._aes.tag + + def encrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + # XXX pass a sink to aes? + d = self._producer.startProducing(self._aes) + d.addCallback(lambda _: self._end_crypto_stream_and_encode_result()) + return d + + def _encode_preamble(self): + current_time = int(time.time()) + + preamble = PACMAN.pack( + BLOB_SIGNATURE_MAGIC, + ENC_SCHEME.symkey, + ENC_METHOD.aes_256_gcm, + current_time, + self.iv, + str(self.doc_id), + str(self.rev), + self._content_size) + return preamble + + def _end_crypto_stream_and_encode_result(self): + + # TODO ---- this needs to be refactored to allow PROPER streaming + # We should write the preamble as soon as possible, + # Is it possible to write the AES stream as soon as it is encrypted by + # chunks? + # FIXME also, it needs to be able to encode chunks with base64 if armor + + preamble, encrypted = self._aes.end() + result = BytesIO() + result.write( + base64.urlsafe_b64encode(preamble)) + result.write(SEPARATOR) + + if self.armor: + result.write( + base64.urlsafe_b64encode(encrypted + self.tag)) + else: + result.write(encrypted + self.tag) + + result.seek(0) + return defer.succeed(result) + + +# TODO maybe rename this to just Decryptor, since it will be used by blobs +# and non blobs in soledad. +class BlobDecryptor(object): + """ + Decrypts an encrypted blob associated with a given Document. + + Will raise an exception if the blob doesn't have the expected structure, or + if the GCM tag doesn't verify. + """ + def __init__(self, doc_info, ciphertext_fd, result=None, + secret=None, armor=True, start_stream=True, tag=None): + if not secret: + raise EncryptionDecryptionError('no secret given') + + self.doc_id = doc_info.doc_id + self.rev = doc_info.rev + self.fd = ciphertext_fd + self.armor = armor + self._producer = None + self.result = result or BytesIO() + sym_key = _get_sym_key_for_doc(doc_info.doc_id, secret) + self.size = None + self.tag = None + + preamble, iv = self._consume_preamble() + soledad_assert(preamble) + soledad_assert(iv) + + self._aes = AESWriter(sym_key, iv, self.result, tag=tag or self.tag) + self._aes.authenticate(preamble) + if start_stream: + self._start_stream() + + @property + def decrypted_content_size(self): + return self._aes.written + + def _start_stream(self): + self._producer = FileBodyProducer(self.fd, readSize=2**16) + + def _consume_preamble(self): + """ + Consume the preamble and write remaining bytes as ciphertext. This + function is called during a stream and can be holding both, so we need + to consume only preamble and store the remaining. + """ + self.fd.seek(0) + try: + parts = self.fd.getvalue().split(SEPARATOR, 1) + preamble = base64.urlsafe_b64decode(parts[0]) + if len(parts) == 2: + ciphertext = parts[1] + if self.armor: + ciphertext = base64.urlsafe_b64decode(ciphertext) + self.tag, ciphertext = ciphertext[-16:], ciphertext[:-16] + self.fd.seek(0) + self.fd.write(ciphertext) + self.fd.seek(len(ciphertext)) + self.fd.truncate() + self.fd.seek(0) + + except (TypeError, ValueError): + raise InvalidBlob + + try: + if len(preamble) == LEGACY_PACMAN.size: + warnings.warn("Decrypting a legacy document without size. " + + "This will be deprecated in 0.12. Doc was: " + + "doc_id: %s rev: %s" % (self.doc_id, self.rev), + Warning) + unpacked_data = LEGACY_PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev = unpacked_data + elif len(preamble) == PACMAN.size: + unpacked_data = PACMAN.unpack(preamble) + magic, sch, meth, ts, iv, doc_id, rev, doc_size = unpacked_data + self.size = doc_size + else: + raise InvalidBlob("Unexpected preamble size %d", len(preamble)) + except struct.error as e: + raise InvalidBlob(e) + + if magic != BLOB_SIGNATURE_MAGIC: + raise InvalidBlob + # TODO check timestamp. Just as a sanity check, but for instance + # we can refuse to process something that is in the future or + # too far in the past (1984 would be nice, hehe) + if sch != ENC_SCHEME.symkey: + raise InvalidBlob('Invalid scheme: %s' % sch) + if meth != ENC_METHOD.aes_256_gcm: + raise InvalidBlob('Invalid encryption scheme: %s' % meth) + if rev != self.rev: + msg = 'Invalid revision. Expected: %s, was: %s' % (self.rev, rev) + raise InvalidBlob(msg) + if doc_id != self.doc_id: + msg = 'Invalid doc_id. ' + + 'Expected: %s, was: %s' % (self.doc_id, doc_id) + raise InvalidBlob(msg) + + return preamble, iv + + def _end_stream(self): + try: + self._aes.end() + except InvalidTag: + raise InvalidBlob('Invalid Tag. Blob authentication failed.') + fd = self.result + fd.seek(0) + return self.result + + def decrypt(self): + """ + Starts producing encrypted data from the cleartext data. + + :return: A deferred which will be fired when encryption ends and whose + callback will be invoked with the resulting ciphertext. + :rtype: twisted.internet.defer.Deferred + """ + d = self.startProducing() + d.addCallback(lambda _: self._end_stream()) + return d + + def startProducing(self): + if not self._producer: + self._start_stream() + return self._producer.startProducing(self._aes) + + def endStream(self): + self._end_stream() + + def write(self, data): + self._aes.write(data) + + def close(self): + result = self._aes.end() + return result + + +@implementer(interfaces.IConsumer) +class AESWriter(object): + """ + A Twisted's Consumer implementation that takes an input file descriptor and + applies AES-256 cipher in GCM mode. + + It is used both for encryption and decryption of a stream, depending of the + value of the tag parameter. If you pass a tag, it will operate in + decryption mode, verifying the authenticity of the preamble and ciphertext. + If no tag is passed, encryption mode is assumed, which will generate a tag. + """ + + def __init__(self, key, iv=None, _buffer=None, tag=None, mode=modes.GCM): + if len(key) != 32: + raise EncryptionDecryptionError('key is not 256 bits') + + if tag is not None: + # if tag, we're decrypting + assert iv is not None + + self.iv = iv or os.urandom(16) + self.buffer = _buffer or BytesIO() + cipher = _get_aes_cipher(key, self.iv, tag, mode) + cipher = cipher.decryptor() if tag else cipher.encryptor() + self.cipher, self.aead = cipher, '' + self.written = 0 + + def authenticate(self, data): + self.aead += data + self.cipher.authenticate_additional_data(data) + + @property + def tag(self): + return getattr(self.cipher, 'tag', None) + + def write(self, data): + self.written += len(data) + self.buffer.write(self.cipher.update(data)) + + def end(self): + self.buffer.write(self.cipher.finalize()) + return self.aead, self.buffer.getvalue() + + +def is_symmetrically_encrypted(content): + """ + Returns True if the document was symmetrically encrypted. + 'EzcB' is the base64 encoding of \x13\x37 magic number and 1 (symmetrically + encrypted value for enc_scheme flag). + + :param doc: The document content as string + :type doc: str + + :rtype: bool + """ + sym_signature = '{"raw": "EzcB' + return content and content.startswith(sym_signature) + + +# utils + + +def _hmac_sha256(key, data): + return hmac.new(key, data, hashlib.sha256).digest() + + +def _get_sym_key_for_doc(doc_id, secret): + key = secret[SECRET_LENGTH:] + return _hmac_sha256(key, doc_id) + + +def _get_aes_cipher(key, iv, tag, mode=modes.GCM): + mode = mode(iv, tag) if mode == modes.GCM else mode(iv) + return Cipher(algorithms.AES(key), mode, backend=CRYPTO_BACKEND) + + +def _mode_by_method(method): + if method == ENC_METHOD.aes_256_gcm: + return modes.GCM + else: + return modes.CTR + + +def _ceiling(size): + """ + Some simplistic ceiling scheme that uses powers of 2. + We report everything below 4096 bytes as that minimum threshold. + See #8759 for research pending for less simplistic/aggresive strategies. + """ + for i in xrange(12, 31): + step = 2 ** i + if size < step: + return step diff --git a/src/leap/soledad/client/_db/__init__.py b/src/leap/soledad/client/_db/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/leap/soledad/client/_db/__init__.py diff --git a/src/leap/soledad/client/_db/adbapi.py b/src/leap/soledad/client/_db/adbapi.py new file mode 100644 index 00000000..5c28d108 --- /dev/null +++ b/src/leap/soledad/client/_db/adbapi.py @@ -0,0 +1,298 @@ +# -*- coding: utf-8 -*- +# adbapi.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +An asyncrhonous interface to soledad using sqlcipher backend. +It uses twisted.enterprise.adbapi. +""" +import re +import sys + +from functools import partial + +from twisted.enterprise import adbapi +from twisted.internet.defer import DeferredSemaphore +from twisted.python import compat +from zope.proxy import ProxyBase, setProxiedObject + +from leap.soledad.common.log import getLogger +from leap.soledad.common.errors import DatabaseAccessError + +from . import sqlcipher +from . import pragmas + +if sys.version_info[0] < 3: + from pysqlcipher import dbapi2 +else: + from pysqlcipher3 import dbapi2 + + +logger = getLogger(__name__) + + +""" +How long the SQLCipher connection should wait for the lock to go away until +raising an exception. +""" +SQLCIPHER_CONNECTION_TIMEOUT = 10 + +""" +How many times a SQLCipher query should be retried in case of timeout. +""" +SQLCIPHER_MAX_RETRIES = 20 + + +def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): + """ + Return a connection pool. + + :param opts: + Options for the SQLCipher connection. + :type opts: SQLCipherOptions + :param openfun: + Callback invoked after every connect() on the underlying DB-API + object. + :type openfun: callable + :param driver: + The connection driver. + :type driver: str + + :return: A U1DB connection pool. + :rtype: U1DBConnectionPool + """ + if openfun is None and driver == "pysqlcipher": + openfun = partial(pragmas.set_init_pragmas, opts=opts) + return U1DBConnectionPool( + opts, + # the following params are relayed "as is" to twisted's + # ConnectionPool. + "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, + check_same_thread=False, cp_openfun=openfun) + + +class U1DBConnection(adbapi.Connection): + """ + A wrapper for a U1DB connection instance. + """ + + u1db_wrapper = sqlcipher.SoledadSQLCipherWrapper + """ + The U1DB wrapper to use. + """ + + def __init__(self, pool, init_u1db=False): + """ + :param pool: The pool of connections to that owns this connection. + :type pool: adbapi.ConnectionPool + :param init_u1db: Wether the u1db database should be initialized. + :type init_u1db: bool + """ + self.init_u1db = init_u1db + try: + adbapi.Connection.__init__(self, pool) + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing connection to sqlcipher database: %s' + % str(e)) + + def reconnect(self): + """ + Reconnect to the U1DB database. + """ + if self._connection is not None: + self._pool.disconnect(self._connection) + self._connection = self._pool.connect() + + if self.init_u1db: + self._u1db = self.u1db_wrapper( + self._connection, + self._pool.opts) + + def __getattr__(self, name): + """ + Route the requested attribute either to the U1DB wrapper or to the + connection. + + :param name: The name of the attribute. + :type name: str + """ + if name.startswith('u1db_'): + attr = re.sub('^u1db_', '', name) + return getattr(self._u1db, attr) + else: + return getattr(self._connection, name) + + +class U1DBTransaction(adbapi.Transaction): + """ + A wrapper for a U1DB 'cursor' object. + """ + + def __getattr__(self, name): + """ + Route the requested attribute either to the U1DB wrapper of the + connection or to the actual connection cursor. + + :param name: The name of the attribute. + :type name: str + """ + if name.startswith('u1db_'): + attr = re.sub('^u1db_', '', name) + return getattr(self._connection._u1db, attr) + else: + return getattr(self._cursor, name) + + +class U1DBConnectionPool(adbapi.ConnectionPool): + """ + Represent a pool of connections to an U1DB database. + """ + + connectionFactory = U1DBConnection + transactionFactory = U1DBTransaction + + def __init__(self, opts, *args, **kwargs): + """ + Initialize the connection pool. + """ + self.opts = opts + try: + adbapi.ConnectionPool.__init__(self, *args, **kwargs) + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing u1db connection pool: %s' % str(e)) + + # all u1db connections, hashed by thread-id + self._u1dbconnections = {} + + # The replica uid, primed by the connections on init. + self.replica_uid = ProxyBase(None) + + try: + conn = self.connectionFactory( + self, init_u1db=True) + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + except DatabaseAccessError as e: + self.threadpool.stop() + raise DatabaseAccessError( + "Error initializing connection factory: %s" % str(e)) + + def runU1DBQuery(self, meth, *args, **kw): + """ + Execute a U1DB query in a thread, using a pooled connection. + + Concurrent threads trying to update the same database may timeout + because of other threads holding the database lock. Because of this, + we will retry SQLCIPHER_MAX_RETRIES times and fail after that. + + :param meth: The U1DB wrapper method name. + :type meth: str + + :return: a Deferred which will fire the return value of + 'self._runU1DBQuery(Transaction(...), *args, **kw)', or a Failure. + :rtype: twisted.internet.defer.Deferred + """ + meth = "u1db_%s" % meth + semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES) + + def _run_interaction(): + return self.runInteraction( + self._runU1DBQuery, meth, *args, **kw) + + def _errback(failure): + failure.trap(dbapi2.OperationalError) + if failure.getErrorMessage() == "database is locked": + logger.warn("database operation timed out") + should_retry = semaphore.acquire() + if should_retry: + logger.warn("trying again...") + return _run_interaction() + logger.warn("giving up!") + return failure + + d = _run_interaction() + d.addErrback(_errback) + return d + + def _runU1DBQuery(self, trans, meth, *args, **kw): + """ + Execute a U1DB query. + + :param trans: An U1DB transaction. + :type trans: adbapi.Transaction + :param meth: the U1DB wrapper method name. + :type meth: str + """ + meth = getattr(trans, meth) + return meth(*args, **kw) + # XXX should return a fetchall? + + # XXX add _runOperation too + + def _runInteraction(self, interaction, *args, **kw): + """ + Interact with the database and return the result. + + :param interaction: + A callable object whose first argument is an + L{adbapi.Transaction}. + :type interaction: callable + :return: a Deferred which will fire the return value of + 'interaction(Transaction(...), *args, **kw)', or a Failure. + :rtype: twisted.internet.defer.Deferred + """ + tid = self.threadID() + u1db = self._u1dbconnections.get(tid) + conn = self.connectionFactory( + self, init_u1db=not bool(u1db)) + + if self.replica_uid is None: + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + + if u1db is None: + self._u1dbconnections[tid] = conn._u1db + else: + conn._u1db = u1db + + trans = self.transactionFactory(self, conn) + try: + result = interaction(trans, *args, **kw) + trans.close() + conn.commit() + return result + except: + excType, excValue, excTraceback = sys.exc_info() + try: + conn.rollback() + except: + logger.error(None, "Rollback failed") + compat.reraise(excValue, excTraceback) + + def finalClose(self): + """ + A final close, only called by the shutdown trigger. + """ + self.shutdownID = None + if self.threadpool.started: + self.threadpool.stop() + self.running = False + for conn in self.connections.values(): + self._close(conn) + for u1db in self._u1dbconnections.values(): + self._close(u1db) + self.connections.clear() diff --git a/src/leap/soledad/client/_db/blobs.py b/src/leap/soledad/client/_db/blobs.py new file mode 100644 index 00000000..10b90c71 --- /dev/null +++ b/src/leap/soledad/client/_db/blobs.py @@ -0,0 +1,554 @@ +# -*- coding: utf-8 -*- +# _blobs.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Clientside BlobBackend Storage. +""" + +from urlparse import urljoin + +import binascii +import os +import base64 + +from io import BytesIO +from functools import partial + +from twisted.logger import Logger +from twisted.enterprise import adbapi +from twisted.internet import defer +from twisted.web.client import FileBodyProducer + +import treq + +from leap.soledad.common.errors import SoledadError +from leap.common.files import mkdir_p + +from .._document import BlobDoc +from .._crypto import DocInfo +from .._crypto import BlobEncryptor +from .._crypto import BlobDecryptor +from .._http import HTTPClient +from .._pipes import TruncatedTailPipe +from .._pipes import PreamblePipe + +from . import pragmas +from . import sqlcipher + + +logger = Logger() +FIXED_REV = 'ImmutableRevision' # Blob content is immutable + + +class BlobAlreadyExistsError(SoledadError): + pass + + +class ConnectionPool(adbapi.ConnectionPool): + + def insertAndGetLastRowid(self, *args, **kwargs): + """ + Execute an SQL query and return the last rowid. + + See: https://sqlite.org/c3ref/last_insert_rowid.html + """ + return self.runInteraction( + self._insertAndGetLastRowid, *args, **kwargs) + + def _insertAndGetLastRowid(self, trans, *args, **kw): + trans.execute(*args, **kw) + return trans.lastrowid + + def blob(self, table, column, irow, flags): + """ + Open a BLOB for incremental I/O. + + Return a handle to the BLOB that would be selected by: + + SELECT column FROM table WHERE rowid = irow; + + See: https://sqlite.org/c3ref/blob_open.html + + :param table: The table in which to lookup the blob. + :type table: str + :param column: The column where the BLOB is located. + :type column: str + :param rowid: The rowid of the BLOB. + :type rowid: int + :param flags: If zero, BLOB is opened for read-only. If non-zero, + BLOB is opened for RW. + :type flags: int + + :return: A BLOB handle. + :rtype: pysqlcipher.dbapi.Blob + """ + return self.runInteraction(self._blob, table, column, irow, flags) + + def _blob(self, trans, table, column, irow, flags): + # TODO: should not use transaction private variable here + handle = trans._connection.blob(table, column, irow, flags) + return handle + + +def check_http_status(code): + if code == 409: + raise BlobAlreadyExistsError() + elif code != 200: + raise SoledadError("Server Error") + + +class DecrypterBuffer(object): + + def __init__(self, blob_id, secret, tag): + self.doc_info = DocInfo(blob_id, FIXED_REV) + self.secret = secret + self.tag = tag + self.preamble_pipe = PreamblePipe(self._make_decryptor) + + def _make_decryptor(self, preamble): + self.decrypter = BlobDecryptor( + self.doc_info, preamble, + secret=self.secret, + armor=False, + start_stream=False, + tag=self.tag) + return TruncatedTailPipe(self.decrypter, tail_size=len(self.tag)) + + def write(self, data): + self.preamble_pipe.write(data) + + def close(self): + real_size = self.decrypter.decrypted_content_size + return self.decrypter._end_stream(), real_size + + +class BlobManager(object): + """ + Ideally, the decrypting flow goes like this: + + - GET a blob from remote server. + - Decrypt the preamble + - Allocate a zeroblob in the sqlcipher sink + - Mark the blob as unusable (ie, not verified) + - Decrypt the payload incrementally, and write chunks to sqlcipher + ** Is it possible to use a small buffer for the aes writer w/o + ** allocating all the memory in openssl? + - Finalize the AES decryption + - If preamble + payload verifies correctly, mark the blob as usable + + """ + + def __init__( + self, local_path, remote, key, secret, user, token=None, + cert_file=None): + if local_path: + mkdir_p(os.path.dirname(local_path)) + self.local = SQLiteBlobBackend(local_path, key) + self.remote = remote + self.secret = secret + self.user = user + self._client = HTTPClient(user, token, cert_file) + + def close(self): + if hasattr(self, 'local') and self.local: + return self.local.close() + + @defer.inlineCallbacks + def remote_list(self): + uri = urljoin(self.remote, self.user + '/') + data = yield self._client.get(uri) + defer.returnValue((yield data.json())) + + def local_list(self): + return self.local.list() + + @defer.inlineCallbacks + def send_missing(self): + our_blobs = yield self.local_list() + server_blobs = yield self.remote_list() + missing = [b_id for b_id in our_blobs if b_id not in server_blobs] + logger.info("Amount of documents missing on server: %s" % len(missing)) + # TODO: Send concurrently when we are able to stream directly from db + for blob_id in missing: + fd = yield self.local.get(blob_id) + logger.info("Upload local blob: %s" % blob_id) + yield self._encrypt_and_upload(blob_id, fd) + + @defer.inlineCallbacks + def fetch_missing(self): + # TODO: Use something to prioritize user requests over general new docs + our_blobs = yield self.local_list() + server_blobs = yield self.remote_list() + docs_we_want = [b_id for b_id in server_blobs if b_id not in our_blobs] + logger.info("Fetching new docs from server: %s" % len(docs_we_want)) + # TODO: Fetch concurrently when we are able to stream directly into db + for blob_id in docs_we_want: + logger.info("Fetching new doc: %s" % blob_id) + yield self.get(blob_id) + + @defer.inlineCallbacks + def put(self, doc, size): + if (yield self.local.exists(doc.blob_id)): + error_message = "Blob already exists: %s" % doc.blob_id + raise BlobAlreadyExistsError(error_message) + fd = doc.blob_fd + # TODO this is a tee really, but ok... could do db and upload + # concurrently. not sure if we'd gain something. + yield self.local.put(doc.blob_id, fd, size=size) + # In fact, some kind of pipe is needed here, where each write on db + # handle gets forwarded into a write on the connection handle + fd = yield self.local.get(doc.blob_id) + yield self._encrypt_and_upload(doc.blob_id, fd) + + @defer.inlineCallbacks + def get(self, blob_id): + local_blob = yield self.local.get(blob_id) + if local_blob: + logger.info("Found blob in local database: %s" % blob_id) + defer.returnValue(local_blob) + + result = yield self._download_and_decrypt(blob_id) + + if not result: + defer.returnValue(None) + blob, size = result + + if blob: + logger.info("Got decrypted blob of type: %s" % type(blob)) + blob.seek(0) + yield self.local.put(blob_id, blob, size=size) + defer.returnValue((yield self.local.get(blob_id))) + else: + # XXX we shouldn't get here, but we will... + # lots of ugly error handling possible: + # 1. retry, might be network error + # 2. try later, maybe didn't finished streaming + # 3.. resignation, might be error while verifying + logger.error('sorry, dunno what happened') + + @defer.inlineCallbacks + def _encrypt_and_upload(self, blob_id, fd): + # TODO ------------------------------------------ + # this is wrong, is doing 2 stages. + # the crypto producer can be passed to + # the uploader and react as data is written. + # try to rewrite as a tube: pass the fd to aes and let aes writer + # produce data to the treq request fd. + # ------------------------------------------------ + logger.info("Staring upload of blob: %s" % blob_id) + doc_info = DocInfo(blob_id, FIXED_REV) + uri = urljoin(self.remote, self.user + "/" + blob_id) + crypter = BlobEncryptor(doc_info, fd, secret=self.secret, + armor=False) + fd = yield crypter.encrypt() + response = yield self._client.put(uri, data=fd) + check_http_status(response.code) + logger.info("Finished upload: %s" % (blob_id,)) + + @defer.inlineCallbacks + def _download_and_decrypt(self, blob_id): + logger.info("Staring download of blob: %s" % blob_id) + # TODO this needs to be connected in a tube + uri = urljoin(self.remote, self.user + '/' + blob_id) + data = yield self._client.get(uri) + + if data.code == 404: + logger.warn("Blob not found in server: %s" % blob_id) + defer.returnValue(None) + elif not data.headers.hasHeader('Tag'): + logger.error("Server didn't send a tag header for: %s" % blob_id) + defer.returnValue(None) + tag = data.headers.getRawHeaders('Tag')[0] + tag = base64.urlsafe_b64decode(tag) + buf = DecrypterBuffer(blob_id, self.secret, tag) + + # incrementally collect the body of the response + yield treq.collect(data, buf.write) + fd, size = buf.close() + logger.info("Finished download: (%s, %d)" % (blob_id, size)) + defer.returnValue((fd, size)) + + @defer.inlineCallbacks + def delete(self, blob_id): + logger.info("Staring deletion of blob: %s" % blob_id) + yield self._delete_from_remote(blob_id) + if (yield self.local.exists(blob_id)): + yield self.local.delete(blob_id) + + def _delete_from_remote(self, blob_id): + # TODO this needs to be connected in a tube + uri = urljoin(self.remote, self.user + '/' + blob_id) + return self._client.delete(uri) + + +class SQLiteBlobBackend(object): + + def __init__(self, path, key=None): + self.path = os.path.abspath( + os.path.join(path, 'soledad_blob.db')) + mkdir_p(os.path.dirname(self.path)) + if not key: + raise ValueError('key cannot be None') + backend = 'pysqlcipher.dbapi2' + opts = sqlcipher.SQLCipherOptions( + '/tmp/ignored', binascii.b2a_hex(key), + is_raw_key=True, create=True) + pragmafun = partial(pragmas.set_init_pragmas, opts=opts) + openfun = _sqlcipherInitFactory(pragmafun) + + self.dbpool = ConnectionPool( + backend, self.path, check_same_thread=False, timeout=5, + cp_openfun=openfun, cp_min=1, cp_max=2, cp_name='blob_pool') + + def close(self): + from twisted._threads import AlreadyQuit + try: + self.dbpool.close() + except AlreadyQuit: + pass + + @defer.inlineCallbacks + def put(self, blob_id, blob_fd, size=None): + logger.info("Saving blob in local database...") + insert = 'INSERT INTO blobs (blob_id, payload) VALUES (?, zeroblob(?))' + irow = yield self.dbpool.insertAndGetLastRowid(insert, (blob_id, size)) + handle = yield self.dbpool.blob('blobs', 'payload', irow, 1) + blob_fd.seek(0) + # XXX I have to copy the buffer here so that I'm able to + # return a non-closed file to the caller (blobmanager.get) + # FIXME should remove this duplication! + # have a look at how treq does cope with closing the handle + # for uploading a file + producer = FileBodyProducer(blob_fd) + done = yield producer.startProducing(handle) + logger.info("Finished saving blob in local database.") + defer.returnValue(done) + + @defer.inlineCallbacks + def get(self, blob_id): + # TODO we can also stream the blob value using sqlite + # incremental interface for blobs - and just return the raw fd instead + select = 'SELECT payload FROM blobs WHERE blob_id = ?' + result = yield self.dbpool.runQuery(select, (blob_id,)) + if result: + defer.returnValue(BytesIO(str(result[0][0]))) + + @defer.inlineCallbacks + def list(self): + query = 'select blob_id from blobs' + result = yield self.dbpool.runQuery(query) + if result: + defer.returnValue([b_id[0] for b_id in result]) + else: + defer.returnValue([]) + + @defer.inlineCallbacks + def exists(self, blob_id): + query = 'SELECT blob_id from blobs WHERE blob_id = ?' + result = yield self.dbpool.runQuery(query, (blob_id,)) + defer.returnValue(bool(len(result))) + + def delete(self, blob_id): + query = 'DELETE FROM blobs WHERE blob_id = ?' + return self.dbpool.runQuery(query, (blob_id,)) + + +def _init_blob_table(conn): + maybe_create = ( + "CREATE TABLE IF NOT EXISTS " + "blobs (" + "blob_id PRIMARY KEY, " + "payload BLOB)") + conn.execute(maybe_create) + + +def _sqlcipherInitFactory(fun): + def _initialize(conn): + fun(conn) + _init_blob_table(conn) + return _initialize + + +# +# testing facilities +# + +@defer.inlineCallbacks +def testit(reactor): + # configure logging to stdout + from twisted.python import log + import sys + log.startLogging(sys.stdout) + + # parse command line arguments + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--url', default='http://localhost:9000/') + parser.add_argument('--path', default='/tmp/blobs') + parser.add_argument('--secret', default='secret') + parser.add_argument('--uuid', default='user') + parser.add_argument('--token', default=None) + parser.add_argument('--cert-file', default='') + + subparsers = parser.add_subparsers(help='sub-command help', dest='action') + + # parse upload command + parser_upload = subparsers.add_parser( + 'upload', help='upload blob and bypass local db') + parser_upload.add_argument('payload') + parser_upload.add_argument('blob_id') + + # parse download command + parser_download = subparsers.add_parser( + 'download', help='download blob and bypass local db') + parser_download.add_argument('blob_id') + parser_download.add_argument('--output-file', default='/tmp/incoming-file') + + # parse put command + parser_put = subparsers.add_parser( + 'put', help='put blob in local db and upload') + parser_put.add_argument('payload') + parser_put.add_argument('blob_id') + + # parse get command + parser_get = subparsers.add_parser( + 'get', help='get blob from local db, get if needed') + parser_get.add_argument('blob_id') + + # parse delete command + parser_get = subparsers.add_parser( + 'delete', help='delete blob from local and remote db') + parser_get.add_argument('blob_id') + + # parse list command + parser_get = subparsers.add_parser( + 'list', help='list local and remote blob ids') + + # parse send_missing command + parser_get = subparsers.add_parser( + 'send_missing', help='send all pending upload blobs') + + # parse send_missing command + parser_get = subparsers.add_parser( + 'fetch_missing', help='fetch all new server blobs') + + # parse arguments + args = parser.parse_args() + + # TODO convert these into proper unittests + + def _manager(): + mkdir_p(os.path.dirname(args.path)) + manager = BlobManager( + args.path, args.url, + 'A' * 32, args.secret, + args.uuid, args.token, args.cert_file) + return manager + + @defer.inlineCallbacks + def _upload(blob_id, payload): + logger.info(":: Starting upload only: %s" % str((blob_id, payload))) + manager = _manager() + with open(payload, 'r') as fd: + yield manager._encrypt_and_upload(blob_id, fd) + logger.info(":: Finished upload only: %s" % str((blob_id, payload))) + + @defer.inlineCallbacks + def _download(blob_id): + logger.info(":: Starting download only: %s" % blob_id) + manager = _manager() + result = yield manager._download_and_decrypt(blob_id) + logger.info(":: Result of download: %s" % str(result)) + if result: + fd, _ = result + with open(args.output_file, 'w') as f: + logger.info(":: Writing data to %s" % args.output_file) + f.write(fd.read()) + logger.info(":: Finished download only: %s" % blob_id) + + @defer.inlineCallbacks + def _put(blob_id, payload): + logger.info(":: Starting full put: %s" % blob_id) + manager = _manager() + size = os.path.getsize(payload) + with open(payload) as fd: + doc = BlobDoc(fd, blob_id) + result = yield manager.put(doc, size=size) + logger.info(":: Result of put: %s" % str(result)) + logger.info(":: Finished full put: %s" % blob_id) + + @defer.inlineCallbacks + def _get(blob_id): + logger.info(":: Starting full get: %s" % blob_id) + manager = _manager() + fd = yield manager.get(blob_id) + if fd: + logger.info(":: Result of get: " + fd.getvalue()) + logger.info(":: Finished full get: %s" % blob_id) + + @defer.inlineCallbacks + def _delete(blob_id): + logger.info(":: Starting deletion of: %s" % blob_id) + manager = _manager() + yield manager.delete(blob_id) + logger.info(":: Finished deletion of: %s" % blob_id) + + @defer.inlineCallbacks + def _list(): + logger.info(":: Listing local blobs") + manager = _manager() + local_list = yield manager.local_list() + logger.info(":: Local list: %s" % local_list) + logger.info(":: Listing remote blobs") + remote_list = yield manager.remote_list() + logger.info(":: Remote list: %s" % remote_list) + + @defer.inlineCallbacks + def _send_missing(): + logger.info(":: Sending local pending upload docs") + manager = _manager() + yield manager.send_missing() + logger.info(":: Finished sending missing docs") + + @defer.inlineCallbacks + def _fetch_missing(): + logger.info(":: Fetching remote new docs") + manager = _manager() + yield manager.fetch_missing() + logger.info(":: Finished fetching new docs") + + if args.action == 'upload': + yield _upload(args.blob_id, args.payload) + elif args.action == 'download': + yield _download(args.blob_id) + elif args.action == 'put': + yield _put(args.blob_id, args.payload) + elif args.action == 'get': + yield _get(args.blob_id) + elif args.action == 'delete': + yield _delete(args.blob_id) + elif args.action == 'list': + yield _list() + elif args.action == 'send_missing': + yield _send_missing() + elif args.action == 'fetch_missing': + yield _fetch_missing() + + +if __name__ == '__main__': + from twisted.internet.task import react + react(testit) diff --git a/src/leap/soledad/client/_db/dbschema.sql b/src/leap/soledad/client/_db/dbschema.sql new file mode 100644 index 00000000..ae027fc5 --- /dev/null +++ b/src/leap/soledad/client/_db/dbschema.sql @@ -0,0 +1,42 @@ +-- Database schema +CREATE TABLE transaction_log ( + generation INTEGER PRIMARY KEY AUTOINCREMENT, + doc_id TEXT NOT NULL, + transaction_id TEXT NOT NULL +); +CREATE TABLE document ( + doc_id TEXT PRIMARY KEY, + doc_rev TEXT NOT NULL, + content TEXT +); +CREATE TABLE document_fields ( + doc_id TEXT NOT NULL, + field_name TEXT NOT NULL, + value TEXT +); +CREATE INDEX document_fields_field_value_doc_idx + ON document_fields(field_name, value, doc_id); + +CREATE TABLE sync_log ( + replica_uid TEXT PRIMARY KEY, + known_generation INTEGER, + known_transaction_id TEXT +); +CREATE TABLE conflicts ( + doc_id TEXT, + doc_rev TEXT, + content TEXT, + CONSTRAINT conflicts_pkey PRIMARY KEY (doc_id, doc_rev) +); +CREATE TABLE index_definitions ( + name TEXT, + offset INT, + field TEXT, + CONSTRAINT index_definitions_pkey PRIMARY KEY (name, offset) +); +create index index_definitions_field on index_definitions(field); +CREATE TABLE u1db_config ( + name TEXT PRIMARY KEY, + value TEXT +); +INSERT INTO u1db_config VALUES ('sql_schema', '0'); diff --git a/src/leap/soledad/client/_db/pragmas.py b/src/leap/soledad/client/_db/pragmas.py new file mode 100644 index 00000000..870ed63e --- /dev/null +++ b/src/leap/soledad/client/_db/pragmas.py @@ -0,0 +1,379 @@ +# -*- coding: utf-8 -*- +# pragmas.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Different pragmas used in the initialization of the SQLCipher database. +""" +import string +import threading +import os + +from leap.soledad.common import soledad_assert +from leap.soledad.common.log import getLogger + + +logger = getLogger(__name__) + + +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + soledad_assert(opts is not None) + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): + + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + set_crypto_pragmas(conn, opts) + + if not nowal: + set_write_ahead_logging(conn) + if sync_off: + set_synchronous_off(conn) + else: + set_synchronous_normal(conn) + if memstore: + set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) + + +def set_crypto_pragmas(db_handle, sqlcipher_opts): + """ + Set cryptographic params (key, cipher, KDF number of iterations and + cipher page size). + + :param db_handle: + :type db_handle: + :param sqlcipher_opts: options for the SQLCipherDatabase + :type sqlcipher_opts: SQLCipherOpts instance + """ + # XXX assert CryptoOptions + opts = sqlcipher_opts + _set_key(db_handle, opts.key, opts.is_raw_key) + _set_cipher(db_handle, opts.cipher) + _set_kdf_iter(db_handle, opts.kdf_iter) + _set_cipher_page_size(db_handle, opts.cipher_page_size) + + +def _set_key(db_handle, key, is_raw_key): + """ + Set the ``key`` for use with the database. + + The process of creating a new, encrypted database is called 'keying' + the database. SQLCipher uses just-in-time key derivation at the point + it is first needed for an operation. This means that the key (and any + options) must be set before the first operation on the database. As + soon as the database is touched (e.g. SELECT, CREATE TABLE, UPDATE, + etc.) and pages need to be read or written, the key is prepared for + use. + + Implementation Notes: + + * PRAGMA key should generally be called as the first operation on a + database. + + :param key: The key for use with the database. + :type key: str + :param is_raw_key: + Whether C{key} is a raw 64-char hex string or a passphrase that should + be hashed to obtain the encyrption key. + :type is_raw_key: bool + """ + if is_raw_key: + _set_key_raw(db_handle, key) + else: + _set_key_passphrase(db_handle, key) + + +def _set_key_passphrase(db_handle, passphrase): + """ + Set a passphrase for encryption key derivation. + + The key itself can be a passphrase, which is converted to a key using + PBKDF2 key derivation. The result is used as the encryption key for + the database. By using this method, there is no way to alter the KDF; + if you want to do so you should use a raw key instead and derive the + key using your own KDF. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param passphrase: The passphrase used to derive the encryption key. + :type passphrase: str + """ + db_handle.cursor().execute("PRAGMA key = '%s'" % passphrase) + + +def _set_key_raw(db_handle, key): + """ + Set a raw hexadecimal encryption key. + + It is possible to specify an exact byte sequence using a blob literal. + With this method, it is the calling application's responsibility to + ensure that the data provided is a 64 character hex string, which will + be converted directly to 32 bytes (256 bits) of key data. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param key: A 64 character hex string. + :type key: str + """ + if not all(c in string.hexdigits for c in key): + raise NotAnHexString(key) + db_handle.cursor().execute('PRAGMA key = "x\'%s"' % key) + + +def _set_cipher(db_handle, cipher='aes-256-cbc'): + """ + Set the cipher and mode to use for symmetric encryption. + + SQLCipher uses aes-256-cbc as the default cipher and mode of + operation. It is possible to change this, though not generally + recommended, using PRAGMA cipher. + + SQLCipher makes direct use of libssl, so all cipher options available + to libssl are also available for use with SQLCipher. See `man enc` for + OpenSSL's supported ciphers. + + Implementation Notes: + + * PRAGMA cipher must be called after PRAGMA key and before the first + actual database operation or it will have no effect. + + * If a non-default value is used PRAGMA cipher to create a database, + it must also be called every time that database is opened. + + * SQLCipher does not implement its own encryption. Instead it uses the + widely available and peer-reviewed OpenSSL libcrypto for all + cryptographic functions. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param cipher: The cipher and mode to use. + :type cipher: str + """ + db_handle.cursor().execute("PRAGMA cipher = '%s'" % cipher) + + +def _set_kdf_iter(db_handle, kdf_iter=4000): + """ + Set the number of iterations for the key derivation function. + + SQLCipher uses PBKDF2 key derivation to strengthen the key and make it + resistent to brute force and dictionary attacks. The default + configuration uses 4000 PBKDF2 iterations (effectively 16,000 SHA1 + operations). PRAGMA kdf_iter can be used to increase or decrease the + number of iterations used. + + Implementation Notes: + + * PRAGMA kdf_iter must be called after PRAGMA key and before the first + actual database operation or it will have no effect. + + * If a non-default value is used PRAGMA kdf_iter to create a database, + it must also be called every time that database is opened. + + * It is not recommended to reduce the number of iterations if a + passphrase is in use. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param kdf_iter: The number of iterations to use. + :type kdf_iter: int + """ + db_handle.cursor().execute("PRAGMA kdf_iter = '%d'" % kdf_iter) + + +def _set_cipher_page_size(db_handle, cipher_page_size=1024): + """ + Set the page size of the encrypted database. + + SQLCipher 2 introduced the new PRAGMA cipher_page_size that can be + used to adjust the page size for the encrypted database. The default + page size is 1024 bytes, but it can be desirable for some applications + to use a larger page size for increased performance. For instance, + some recent testing shows that increasing the page size can noticeably + improve performance (5-30%) for certain queries that manipulate a + large number of pages (e.g. selects without an index, large inserts in + a transaction, big deletes). + + To adjust the page size, call the pragma immediately after setting the + key for the first time and each subsequent time that you open the + database. + + Implementation Notes: + + * PRAGMA cipher_page_size must be called after PRAGMA key and before + the first actual database operation or it will have no effect. + + * If a non-default value is used PRAGMA cipher_page_size to create a + database, it must also be called every time that database is opened. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param cipher_page_size: The page size. + :type cipher_page_size: int + """ + db_handle.cursor().execute( + "PRAGMA cipher_page_size = '%d'" % cipher_page_size) + + +# XXX UNUSED ? +def set_rekey(db_handle, new_key, is_raw_key): + """ + Change the key of an existing encrypted database. + + To change the key on an existing encrypted database, it must first be + unlocked with the current encryption key. Once the database is + readable and writeable, PRAGMA rekey can be used to re-encrypt every + page in the database with a new key. + + * PRAGMA rekey must be called after PRAGMA key. It can be called at any + time once the database is readable. + + * PRAGMA rekey can not be used to encrypted a standard SQLite + database! It is only useful for changing the key on an existing + database. + + * Previous versions of SQLCipher provided a PRAGMA rekey_cipher and + code>PRAGMA rekey_kdf_iter. These are deprecated and should not be + used. Instead, use sqlcipher_export(). + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param new_key: The new key. + :type new_key: str + :param is_raw_key: Whether C{password} is a raw 64-char hex string or a + passphrase that should be hashed to obtain the encyrption + key. + :type is_raw_key: bool + """ + if is_raw_key: + _set_rekey_raw(db_handle, new_key) + else: + _set_rekey_passphrase(db_handle, new_key) + + +def _set_rekey_passphrase(db_handle, passphrase): + """ + Change the passphrase for encryption key derivation. + + The key itself can be a passphrase, which is converted to a key using + PBKDF2 key derivation. The result is used as the encryption key for + the database. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param passphrase: The passphrase used to derive the encryption key. + :type passphrase: str + """ + db_handle.cursor().execute("PRAGMA rekey = '%s'" % passphrase) + + +def _set_rekey_raw(db_handle, key): + """ + Change the raw hexadecimal encryption key. + + It is possible to specify an exact byte sequence using a blob literal. + With this method, it is the calling application's responsibility to + ensure that the data provided is a 64 character hex string, which will + be converted directly to 32 bytes (256 bits) of key data. + + :param db_handle: A handle to the SQLCipher database. + :type db_handle: pysqlcipher.Connection + :param key: A 64 character hex string. + :type key: str + """ + if not all(c in string.hexdigits for c in key): + raise NotAnHexString(key) + db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % key) + + +def set_synchronous_off(db_handle): + """ + Change the setting of the "synchronous" flag to OFF. + """ + logger.debug("sqlcipher: setting synchronous off") + db_handle.cursor().execute('PRAGMA synchronous=OFF') + + +def set_synchronous_normal(db_handle): + """ + Change the setting of the "synchronous" flag to NORMAL. + """ + logger.debug("sqlcipher: setting synchronous normal") + db_handle.cursor().execute('PRAGMA synchronous=NORMAL') + + +def set_mem_temp_store(db_handle): + """ + Use a in-memory store for temporary tables. + """ + logger.debug("sqlcipher: setting temp_store memory") + db_handle.cursor().execute('PRAGMA temp_store=MEMORY') + + +def set_write_ahead_logging(db_handle): + """ + Enable write-ahead logging, and set the autocheckpoint to 50 pages. + + Setting the autocheckpoint to a small value, we make the reads not + suffer too much performance degradation. + + From the sqlite docs: + + "There is a tradeoff between average read performance and average write + performance. To maximize the read performance, one wants to keep the + WAL as small as possible and hence run checkpoints frequently, perhaps + as often as every COMMIT. To maximize write performance, one wants to + amortize the cost of each checkpoint over as many writes as possible, + meaning that one wants to run checkpoints infrequently and let the WAL + grow as large as possible before each checkpoint. The decision of how + often to run checkpoints may therefore vary from one application to + another depending on the relative read and write performance + requirements of the application. The default strategy is to run a + checkpoint once the WAL reaches 1000 pages" + """ + logger.debug("sqlcipher: setting write-ahead logging") + db_handle.cursor().execute('PRAGMA journal_mode=WAL') + + # The optimum value can still use a little bit of tuning, but we favor + # small sizes of the WAL file to get fast reads, since we assume that + # the writes will be quick enough to not block too much. + + db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50') + + +class NotAnHexString(Exception): + """ + Raised when trying to (raw) key the database with a non-hex string. + """ + pass diff --git a/src/leap/soledad/client/_db/sqlcipher.py b/src/leap/soledad/client/_db/sqlcipher.py new file mode 100644 index 00000000..d22017bd --- /dev/null +++ b/src/leap/soledad/client/_db/sqlcipher.py @@ -0,0 +1,633 @@ +# -*- coding: utf-8 -*- +# sqlcipher.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A U1DB backend that uses SQLCipher as its persistence layer. + +The SQLCipher API (http://sqlcipher.net/sqlcipher-api/) is fully implemented, +with the exception of the following statements: + + * PRAGMA cipher_use_hmac + * PRAGMA cipher_default_use_mac + +SQLCipher 2.0 introduced a per-page HMAC to validate that the page data has +not be tampered with. By default, when creating or opening a database using +SQLCipher 2, SQLCipher will attempt to use an HMAC check. This change in +database format means that SQLCipher 2 can't operate on version 1.1.x +databases by default. Thus, in order to provide backward compatibility with +SQLCipher 1.1.x, PRAGMA cipher_use_hmac can be used to disable the HMAC +functionality on specific databases. + +In some very specific cases, it is not possible to call PRAGMA cipher_use_hmac +as one of the first operations on a database. An example of this is when +trying to ATTACH a 1.1.x database to the main database. In these cases PRAGMA +cipher_default_use_hmac can be used to globally alter the default use of HMAC +when opening a database. + +So, as the statements above were introduced for backwards compatibility with +SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases +handled by Soledad should be created by SQLCipher >= 2.0. +""" +import os +import sys + +from functools import partial + +from twisted.internet import reactor +from twisted.internet import defer +from twisted.enterprise import adbapi + +from leap.soledad.common.log import getLogger +from leap.soledad.common.l2db import errors as u1db_errors +from leap.soledad.common.errors import DatabaseAccessError + +from leap.soledad.client.http_target import SoledadHTTPSyncTarget +from leap.soledad.client.sync import SoledadSynchronizer + +from .._document import Document +from . import sqlite +from . import pragmas + +if sys.version_info[0] < 3: + from pysqlcipher import dbapi2 as sqlcipher_dbapi2 +else: + from pysqlcipher3 import dbapi2 as sqlcipher_dbapi2 + +logger = getLogger(__name__) + + +# Monkey-patch u1db.backends.sqlite with pysqlcipher.dbapi2 +sqlite.dbapi2 = sqlcipher_dbapi2 + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): + """ + Initialize a SQLCipher database. + + :param opts: + :type opts: SQLCipherOptions + :param on_init: a tuple of queries to be executed on initialization + :type on_init: tuple + :return: pysqlcipher.dbapi2.Connection + """ + # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6) + # where without re-opening the database on Windows, it + # doesn't see the transaction that was just committed + # Removing from here now, look at the pysqlite implementation if the + # bug shows up in windows. + + if not os.path.isfile(opts.path) and not opts.create: + raise u1db_errors.DatabaseDoesNotExist() + + conn = sqlcipher_dbapi2.connect( + opts.path, check_same_thread=check_same_thread) + pragmas.set_init_pragmas(conn, opts, extra_queries=on_init) + return conn + + +def initialize_sqlcipher_adbapi_db(opts, extra_queries=None): + from leap.soledad.client import sqlcipher_adbapi + return sqlcipher_adbapi.getConnectionPool( + opts, extra_queries=extra_queries) + + +class SQLCipherOptions(object): + """ + A container with options for the initialization of an SQLCipher database. + """ + + @classmethod + def copy(cls, source, path=None, key=None, create=None, + is_raw_key=None, cipher=None, kdf_iter=None, + cipher_page_size=None, sync_db_key=None): + """ + Return a copy of C{source} with parameters different than None + replaced by new values. + """ + local_vars = locals() + args = [] + kwargs = {} + + for name in ["path", "key"]: + val = local_vars[name] + if val is not None: + args.append(val) + else: + args.append(getattr(source, name)) + + for name in ["create", "is_raw_key", "cipher", "kdf_iter", + "cipher_page_size", "sync_db_key"]: + val = local_vars[name] + if val is not None: + kwargs[name] = val + else: + kwargs[name] = getattr(source, name) + + return SQLCipherOptions(*args, **kwargs) + + def __init__(self, path, key, create=True, is_raw_key=False, + cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, + sync_db_key=None): + """ + :param path: The filesystem path for the database to open. + :type path: str + :param create: + True/False, should the database be created if it doesn't + already exist? + :param create: bool + :param is_raw_key: + Whether ``password`` is a raw 64-char hex string or a passphrase + that should be hashed to obtain the encyrption key. + :type raw_key: bool + :param cipher: The cipher and mode to use. + :type cipher: str + :param kdf_iter: The number of iterations to use. + :type kdf_iter: int + :param cipher_page_size: The page size. + :type cipher_page_size: int + """ + self.path = path + self.key = key + self.is_raw_key = is_raw_key + self.create = create + self.cipher = cipher + self.kdf_iter = kdf_iter + self.cipher_page_size = cipher_page_size + self.sync_db_key = sync_db_key + + def __str__(self): + """ + Return string representation of options, for easy debugging. + + :return: String representation of options. + :rtype: str + """ + attr_names = filter(lambda a: not a.startswith('_'), dir(self)) + attr_str = [] + for a in attr_names: + attr_str.append(a + "=" + str(getattr(self, a))) + name = self.__class__.__name__ + return "%s(%s)" % (name, ', '.join(attr_str)) + + +# +# The SQLCipher database +# + +class SQLCipherDatabase(sqlite.SQLitePartialExpandDatabase): + """ + A U1DB implementation that uses SQLCipher as its persistence layer. + """ + + # The attribute _index_storage_value will be used as the lookup key for the + # implementation of the SQLCipher storage backend. + _index_storage_value = 'expand referenced encrypted' + + def __init__(self, opts): + """ + Connect to an existing SQLCipher database, creating a new sqlcipher + database file if needed. + + *** IMPORTANT *** + + Don't forget to close the database after use by calling the close() + method otherwise some resources might not be freed and you may + experience several kinds of leakages. + + *** IMPORTANT *** + + :param opts: options for initialization of the SQLCipher database. + :type opts: SQLCipherOptions + """ + # ensure the db is encrypted if the file already exists + if os.path.isfile(opts.path): + self._db_handle = _assert_db_is_encrypted(opts) + else: + # connect to the sqlcipher database + self._db_handle = initialize_sqlcipher_db(opts) + + # TODO --------------------------------------------------- + # Everything else in this initialization has to be factored + # out, so it can be used from SoledadSQLCipherWrapper.__init__ + # too. + # --------------------------------------------------------- + + self._ensure_schema() + self.set_document_factory(doc_factory) + self._prime_replica_uid() + + def _prime_replica_uid(self): + """ + In the u1db implementation, _replica_uid is a property + that returns the value in _real_replica_uid, and does + a db query if no value found. + Here we prime the replica uid during initialization so + that we don't have to wait for the query afterwards. + """ + self._real_replica_uid = None + self._get_replica_uid() + + def _extra_schema_init(self, c): + """ + Add any extra fields, etc to the basic table definitions. + + This method is called by u1db.backends.sqlite_backend._initialize() + method, which is executed when the database schema is created. Here, + we use it to include the "syncable" property for LeapDocuments. + + :param c: The cursor for querying the database. + :type c: dbapi2.cursor + """ + c.execute( + 'ALTER TABLE document ' + 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') + + # + # SQLCipher API methods + # + + # Extra query methods: extensions to the base u1db sqlite implmentation. + + def get_count_from_index(self, index_name, *key_values): + """ + Return the count for a given combination of index_name + and key values. + + Extension method made from similar methods in u1db version 13.09 + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: count. + :rtype: int + """ + c = self._db_handle.cursor() + definition = self._get_index_definition(index_name) + + if len(key_values) != len(definition): + raise u1db_errors.InvalidValueForIndex() + tables = ["document_fields d%d" % i for i in range(len(definition))] + novalue_where = ["d.doc_id = d%d.doc_id" + " AND d%d.field_name = ?" + % (i, i) for i in range(len(definition))] + exact_where = [novalue_where[i] + (" AND d%d.value = ?" % (i,)) + for i in range(len(definition))] + args = [] + where = [] + for idx, (field, value) in enumerate(zip(definition, key_values)): + args.append(field) + where.append(exact_where[idx]) + args.append(value) + + tables = ["document_fields d%d" % i for i in range(len(definition))] + statement = ( + "SELECT COUNT(*) FROM document d, %s WHERE %s " % ( + ', '.join(tables), + ' AND '.join(where), + )) + try: + c.execute(statement, tuple(args)) + except sqlcipher_dbapi2.OperationalError as e: + raise sqlcipher_dbapi2.OperationalError( + str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args)) + res = c.fetchall() + return res[0][0] + + def close(self): + """ + Close db connections. + """ + # TODO should be handled by adbapi instead + # TODO syncdb should be stopped first + + if logger is not None: # logger might be none if called from __del__ + logger.debug("SQLCipher backend: closing") + + # close the actual database + if getattr(self, '_db_handle', False): + self._db_handle.close() + self._db_handle = None + + # indexes + + def _put_and_update_indexes(self, old_doc, doc): + """ + Update a document and all indexes related to it. + + :param old_doc: The old version of the document. + :type old_doc: u1db.Document + :param doc: The new version of the document. + :type doc: u1db.Document + """ + sqlite.SQLitePartialExpandDatabase._put_and_update_indexes( + self, old_doc, doc) + c = self._db_handle.cursor() + c.execute('UPDATE document SET syncable=? WHERE doc_id=?', + (doc.syncable, doc.doc_id)) + + def _get_doc(self, doc_id, check_for_conflicts=False): + """ + Get just the document content, without fancy handling. + + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise asking for a deleted + document will return None. + :type include_deleted: bool + + :return: a Document object. + :type: u1db.Document + """ + doc = sqlite.SQLitePartialExpandDatabase._get_doc( + self, doc_id, check_for_conflicts) + if doc: + c = self._db_handle.cursor() + c.execute('SELECT syncable FROM document WHERE doc_id=?', + (doc.doc_id,)) + result = c.fetchone() + doc.syncable = bool(result[0]) + return doc + + def __del__(self): + """ + Free resources when deleting or garbage collecting the database. + + This is only here to minimze problems if someone ever forgets to call + the close() method after using the database; you should not rely on + garbage collecting to free up the database resources. + """ + self.close() + + +class SQLCipherU1DBSync(SQLCipherDatabase): + """ + Soledad syncer implementation. + """ + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' + + """ + Period or recurrence of the Looping Call that will do the encryption to the + syncdb (in seconds). + """ + ENCRYPT_LOOP_PERIOD = 1 + + def __init__(self, opts, soledad_crypto, replica_uid, cert_file): + self._opts = opts + self._path = opts.path + self._crypto = soledad_crypto + self.__replica_uid = replica_uid + self._cert_file = cert_file + + # storage for the documents received during a sync + self.received_docs = [] + + self.running = False + self._db_handle = None + + # initialize the main db before scheduling a start + self._initialize_main_db() + self._reactor = reactor + self._reactor.callWhenRunning(self._start) + + if DO_STATS: + self.sync_phase = None + + def commit(self): + self._db_handle.commit() + + @property + def _replica_uid(self): + return str(self.__replica_uid) + + def _start(self): + if not self.running: + self.running = True + + def _initialize_main_db(self): + try: + self._db_handle = initialize_sqlcipher_db( + self._opts, check_same_thread=False) + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(doc_factory) + except sqlcipher_dbapi2.DatabaseError as e: + raise DatabaseAccessError(str(e)) + + @defer.inlineCallbacks + def sync(self, url, creds=None): + """ + Synchronize documents with remote replica exposed at url. + + It is not safe to initiate more than one sync process and let them run + concurrently. It is responsibility of the caller to ensure that there + are no concurrent sync processes running. This is currently controlled + by the main Soledad object because it may also run post-sync hooks, + which should be run while the lock is locked. + + :param url: The url of the target replica to sync with. + :type url: str + :param creds: optional dictionary giving credentials to authorize the + operation with the server. + :type creds: dict + + :return: + A Deferred, that will fire with the local generation (type `int`) + before the synchronisation was performed. + :rtype: Deferred + """ + syncer = self._get_syncer(url, creds=creds) + if DO_STATS: + self.sync_phase = syncer.sync_phase + self.syncer = syncer + self.sync_exchange_phase = syncer.sync_exchange_phase + local_gen_before_sync = yield syncer.sync() + self.received_docs = syncer.received_docs + defer.returnValue(local_gen_before_sync) + + def _get_syncer(self, url, creds=None): + """ + Get a synchronizer for ``url`` using ``creds``. + + :param url: The url of the target replica to sync with. + :type url: str + :param creds: optional dictionary giving credentials. + to authorize the operation with the server. + :type creds: dict + + :return: A synchronizer. + :rtype: Synchronizer + """ + return SoledadSynchronizer( + self, + SoledadHTTPSyncTarget( + url, + # XXX is the replica_uid ready? + self._replica_uid, + creds=creds, + crypto=self._crypto, + cert_file=self._cert_file)) + + # + # Symmetric encryption of syncing docs + # + + def get_generation(self): + # FIXME + # XXX this SHOULD BE a callback + return self._get_generation() + + +class U1DBSQLiteBackend(sqlite.SQLitePartialExpandDatabase): + """ + A very simple wrapper for u1db around sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + + It can be used in tests and debug runs to initialize the adbapi with plain + sqlite connections, decoupled from the sqlcipher layer. + """ + + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self._factory = Document + + +class SoledadSQLCipherWrapper(SQLCipherDatabase): + """ + A wrapper for u1db that uses the Soledad-extended sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + + It can be used from adbapi to initialize a soledad database after + getting a regular connection to a sqlcipher database. + """ + def __init__(self, conn, opts): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(doc_factory) + self._prime_replica_uid() + + +def _assert_db_is_encrypted(opts): + """ + Assert that the sqlcipher file contains an encrypted database. + + When opening an existing database, PRAGMA key will not immediately + throw an error if the key provided is incorrect. To test that the + database can be successfully opened with the provided key, it is + necessary to perform some operation on the database (i.e. read from + it) and confirm it is success. + + The easiest way to do this is select off the sqlite_master table, + which will attempt to read the first page of the database and will + parse the schema. + + :param opts: + """ + # We try to open an encrypted database with the regular u1db + # backend should raise a DatabaseError exception. + # If the regular backend succeeds, then we need to stop because + # the database was not properly initialized. + try: + sqlite.SQLitePartialExpandDatabase(opts.path) + except sqlcipher_dbapi2.DatabaseError: + # assert that we can access it using SQLCipher with the given + # key + dummy_query = ('SELECT count(*) FROM sqlite_master',) + return initialize_sqlcipher_db(opts, on_init=dummy_query) + else: + raise DatabaseIsNotEncrypted() + +# +# Exceptions +# + + +class DatabaseIsNotEncrypted(Exception): + """ + Exception raised when trying to open non-encrypted databases. + """ + pass + + +def doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Return a default Soledad Document. + Used in the initialization for SQLCipherDatabase + """ + return Document(doc_id=doc_id, rev=rev, json=json, + has_conflicts=has_conflicts, syncable=syncable) + + +sqlite.SQLiteDatabase.register_implementation(SQLCipherDatabase) + + +# +# twisted.enterprise.adbapi SQLCipher implementation +# + +SQLCIPHER_CONNECTION_TIMEOUT = 10 + + +def getConnectionPool(opts, extra_queries=None): + openfun = partial( + pragmas.set_init_pragmas, + opts=opts, + extra_queries=extra_queries) + return SQLCipherConnectionPool( + database=opts.path, + check_same_thread=False, + cp_openfun=openfun, + timeout=SQLCIPHER_CONNECTION_TIMEOUT) + + +class SQLCipherConnection(adbapi.Connection): + pass + + +class SQLCipherTransaction(adbapi.Transaction): + pass + + +class SQLCipherConnectionPool(adbapi.ConnectionPool): + + connectionFactory = SQLCipherConnection + transactionFactory = SQLCipherTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__( + self, "pysqlcipher.dbapi2", *args, **kwargs) diff --git a/src/leap/soledad/client/_db/sqlite.py b/src/leap/soledad/client/_db/sqlite.py new file mode 100644 index 00000000..4f7b1259 --- /dev/null +++ b/src/leap/soledad/client/_db/sqlite.py @@ -0,0 +1,930 @@ +# Copyright 2011 Canonical Ltd. +# Copyright 2016 LEAP Encryption Access Project +# +# This file is part of u1db. +# +# u1db is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# as published by the Free Software Foundation. +# +# u1db is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with u1db. If not, see <http://www.gnu.org/licenses/>. + +""" +A L2DB implementation that uses SQLite as its persistence layer. +""" + +import errno +import os +import json +import sys +import time +import uuid +import pkg_resources + +from sqlite3 import dbapi2 + +from leap.soledad.common.l2db.backends import CommonBackend, CommonSyncTarget +from leap.soledad.common.l2db import ( + Document, errors, + query_parser, vectorclock) + + +class SQLiteDatabase(CommonBackend): + """A U1DB implementation that uses SQLite as its persistence layer.""" + + _sqlite_registry = {} + + def __init__(self, sqlite_file, document_factory=None): + """Create a new sqlite file.""" + self._db_handle = dbapi2.connect(sqlite_file) + self._real_replica_uid = None + self._ensure_schema() + self._factory = document_factory or Document + + def set_document_factory(self, factory): + self._factory = factory + + def get_sync_target(self): + return SQLiteSyncTarget(self) + + @classmethod + def _which_index_storage(cls, c): + try: + c.execute("SELECT value FROM u1db_config" + " WHERE name = 'index_storage'") + except dbapi2.OperationalError as e: + # The table does not exist yet + return None, e + else: + return c.fetchone()[0], None + + WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL = 0.5 + + @classmethod + def _open_database(cls, sqlite_file, document_factory=None): + if not os.path.isfile(sqlite_file): + raise errors.DatabaseDoesNotExist() + tries = 2 + while True: + # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6) + # where without re-opening the database on Windows, it + # doesn't see the transaction that was just committed + db_handle = dbapi2.connect(sqlite_file) + c = db_handle.cursor() + v, err = cls._which_index_storage(c) + db_handle.close() + if v is not None: + break + # possibly another process is initializing it, wait for it to be + # done + if tries == 0: + raise err # go for the richest error? + tries -= 1 + time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL) + return SQLiteDatabase._sqlite_registry[v]( + sqlite_file, document_factory=document_factory) + + @classmethod + def open_database(cls, sqlite_file, create, backend_cls=None, + document_factory=None): + try: + return cls._open_database( + sqlite_file, document_factory=document_factory) + except errors.DatabaseDoesNotExist: + if not create: + raise + if backend_cls is None: + # default is SQLitePartialExpandDatabase + backend_cls = SQLitePartialExpandDatabase + return backend_cls(sqlite_file, document_factory=document_factory) + + @staticmethod + def delete_database(sqlite_file): + try: + os.unlink(sqlite_file) + except OSError as ex: + if ex.errno == errno.ENOENT: + raise errors.DatabaseDoesNotExist() + raise + + @staticmethod + def register_implementation(klass): + """Register that we implement an SQLiteDatabase. + + The attribute _index_storage_value will be used as the lookup key. + """ + SQLiteDatabase._sqlite_registry[klass._index_storage_value] = klass + + def _get_sqlite_handle(self): + """Get access to the underlying sqlite database. + + This should only be used by the test suite, etc, for examining the + state of the underlying database. + """ + return self._db_handle + + def _close_sqlite_handle(self): + """Release access to the underlying sqlite database.""" + self._db_handle.close() + + def close(self): + self._close_sqlite_handle() + + def _is_initialized(self, c): + """Check if this database has been initialized.""" + c.execute("PRAGMA case_sensitive_like=ON") + try: + c.execute("SELECT value FROM u1db_config" + " WHERE name = 'sql_schema'") + except dbapi2.OperationalError: + # The table does not exist yet + val = None + else: + val = c.fetchone() + if val is not None: + return True + return False + + def _initialize(self, c): + """Create the schema in the database.""" + # read the script with sql commands + # TODO: Change how we set up the dependency. Most likely use something + # like lp:dirspec to grab the file from a common resource + # directory. Doesn't specifically need to be handled until we get + # to the point of packaging this. + schema_content = pkg_resources.resource_string( + __name__, 'dbschema.sql') + # Note: We'd like to use c.executescript() here, but it seems that + # executescript always commits, even if you set + # isolation_level = None, so if we want to properly handle + # exclusive locking and rollbacks between processes, we need + # to execute it line-by-line + for line in schema_content.split(';'): + if not line: + continue + c.execute(line) + # add extra fields + self._extra_schema_init(c) + # A unique identifier should be set for this replica. Implementations + # don't have to strictly use uuid here, but we do want the uid to be + # unique amongst all databases that will sync with each other. + # We might extend this to using something with hostname for easier + # debugging. + self._set_replica_uid_in_transaction(uuid.uuid4().hex) + c.execute("INSERT INTO u1db_config VALUES" " ('index_storage', ?)", + (self._index_storage_value,)) + + def _ensure_schema(self): + """Ensure that the database schema has been created.""" + old_isolation_level = self._db_handle.isolation_level + c = self._db_handle.cursor() + if self._is_initialized(c): + return + try: + # autocommit/own mgmt of transactions + self._db_handle.isolation_level = None + with self._db_handle: + # only one execution path should initialize the db + c.execute("begin exclusive") + if self._is_initialized(c): + return + self._initialize(c) + finally: + self._db_handle.isolation_level = old_isolation_level + + def _extra_schema_init(self, c): + """Add any extra fields, etc to the basic table definitions.""" + + def _parse_index_definition(self, index_field): + """Parse a field definition for an index, returning a Getter.""" + # Note: We may want to keep a Parser object around, and cache the + # Getter objects for a greater length of time. Specifically, if + # you create a bunch of indexes, and then insert 50k docs, you'll + # re-parse the indexes between puts. The time to insert the docs + # is still likely to dominate put_doc time, though. + parser = query_parser.Parser() + getter = parser.parse(index_field) + return getter + + def _update_indexes(self, doc_id, raw_doc, getters, db_cursor): + """Update document_fields for a single document. + + :param doc_id: Identifier for this document + :param raw_doc: The python dict representation of the document. + :param getters: A list of [(field_name, Getter)]. Getter.get will be + called to evaluate the index definition for this document, and the + results will be inserted into the db. + :param db_cursor: An sqlite Cursor. + :return: None + """ + values = [] + for field_name, getter in getters: + for idx_value in getter.get(raw_doc): + values.append((doc_id, field_name, idx_value)) + if values: + db_cursor.executemany( + "INSERT INTO document_fields VALUES (?, ?, ?)", values) + + def _set_replica_uid(self, replica_uid): + """Force the replica_uid to be set.""" + with self._db_handle: + self._set_replica_uid_in_transaction(replica_uid) + + def _set_replica_uid_in_transaction(self, replica_uid): + """Set the replica_uid. A transaction should already be held.""" + c = self._db_handle.cursor() + c.execute("INSERT OR REPLACE INTO u1db_config" + " VALUES ('replica_uid', ?)", + (replica_uid,)) + self._real_replica_uid = replica_uid + + def _get_replica_uid(self): + if self._real_replica_uid is not None: + return self._real_replica_uid + c = self._db_handle.cursor() + c.execute("SELECT value FROM u1db_config WHERE name = 'replica_uid'") + val = c.fetchone() + if val is None: + return None + self._real_replica_uid = val[0] + return self._real_replica_uid + + _replica_uid = property(_get_replica_uid) + + def _get_generation(self): + c = self._db_handle.cursor() + c.execute('SELECT max(generation) FROM transaction_log') + val = c.fetchone()[0] + if val is None: + return 0 + return val + + def _get_generation_info(self): + c = self._db_handle.cursor() + c.execute( + 'SELECT max(generation), transaction_id FROM transaction_log ') + val = c.fetchone() + if val[0] is None: + return(0, '') + return val + + def _get_trans_id_for_gen(self, generation): + if generation == 0: + return '' + c = self._db_handle.cursor() + c.execute( + 'SELECT transaction_id FROM transaction_log WHERE generation = ?', + (generation,)) + val = c.fetchone() + if val is None: + raise errors.InvalidGeneration + return val[0] + + def _get_transaction_log(self): + c = self._db_handle.cursor() + c.execute("SELECT doc_id, transaction_id FROM transaction_log" + " ORDER BY generation") + return c.fetchall() + + def _get_doc(self, doc_id, check_for_conflicts=False): + """Get just the document content, without fancy handling.""" + c = self._db_handle.cursor() + if check_for_conflicts: + c.execute( + "SELECT document.doc_rev, document.content, " + "count(conflicts.doc_rev) FROM document LEFT OUTER JOIN " + "conflicts ON conflicts.doc_id = document.doc_id WHERE " + "document.doc_id = ? GROUP BY document.doc_id, " + "document.doc_rev, document.content;", (doc_id,)) + else: + c.execute( + "SELECT doc_rev, content, 0 FROM document WHERE doc_id = ?", + (doc_id,)) + val = c.fetchone() + if val is None: + return None + doc_rev, content, conflicts = val + doc = self._factory(doc_id, doc_rev, content) + doc.has_conflicts = conflicts > 0 + return doc + + def _has_conflicts(self, doc_id): + c = self._db_handle.cursor() + c.execute("SELECT 1 FROM conflicts WHERE doc_id = ? LIMIT 1", + (doc_id,)) + val = c.fetchone() + if val is None: + return False + else: + return True + + def get_doc(self, doc_id, include_deleted=False): + doc = self._get_doc(doc_id, check_for_conflicts=True) + if doc is None: + return None + if doc.is_tombstone() and not include_deleted: + return None + return doc + + def get_all_docs(self, include_deleted=False): + """Get all documents from the database.""" + generation = self._get_generation() + results = [] + c = self._db_handle.cursor() + c.execute( + "SELECT document.doc_id, document.doc_rev, document.content, " + "count(conflicts.doc_rev) FROM document LEFT OUTER JOIN conflicts " + "ON conflicts.doc_id = document.doc_id GROUP BY document.doc_id, " + "document.doc_rev, document.content;") + rows = c.fetchall() + for doc_id, doc_rev, content, conflicts in rows: + if content is None and not include_deleted: + continue + doc = self._factory(doc_id, doc_rev, content) + doc.has_conflicts = conflicts > 0 + results.append(doc) + return (generation, results) + + def put_doc(self, doc): + if doc.doc_id is None: + raise errors.InvalidDocId() + self._check_doc_id(doc.doc_id) + self._check_doc_size(doc) + with self._db_handle: + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc and old_doc.has_conflicts: + raise errors.ConflictedDoc() + if old_doc and doc.rev is None and old_doc.is_tombstone(): + new_rev = self._allocate_doc_rev(old_doc.rev) + else: + if old_doc is not None: + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + else: + if doc.rev is not None: + raise errors.RevisionConflict() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + self._put_and_update_indexes(old_doc, doc) + return new_rev + + def _expand_to_fields(self, doc_id, base_field, raw_doc, save_none): + """Convert a dict representation into named fields. + + So something like: {'key1': 'val1', 'key2': 'val2'} + gets converted into: [(doc_id, 'key1', 'val1', 0) + (doc_id, 'key2', 'val2', 0)] + :param doc_id: Just added to every record. + :param base_field: if set, these are nested keys, so each field should + be appropriately prefixed. + :param raw_doc: The python dictionary. + """ + # TODO: Handle lists + values = [] + for field_name, value in raw_doc.iteritems(): + if value is None and not save_none: + continue + if base_field: + full_name = base_field + '.' + field_name + else: + full_name = field_name + if value is None or isinstance(value, (int, float, basestring)): + values.append((doc_id, full_name, value, len(values))) + else: + subvalues = self._expand_to_fields(doc_id, full_name, value, + save_none) + for _, subfield_name, val, _ in subvalues: + values.append((doc_id, subfield_name, val, len(values))) + return values + + def _put_and_update_indexes(self, old_doc, doc): + """Actually insert a document into the database. + + This both updates the existing documents content, and any indexes that + refer to this document. + """ + raise NotImplementedError(self._put_and_update_indexes) + + def whats_changed(self, old_generation=0): + c = self._db_handle.cursor() + c.execute("SELECT generation, doc_id, transaction_id" + " FROM transaction_log" + " WHERE generation > ? ORDER BY generation DESC", + (old_generation,)) + results = c.fetchall() + cur_gen = old_generation + seen = set() + changes = [] + newest_trans_id = '' + for generation, doc_id, trans_id in results: + if doc_id not in seen: + changes.append((doc_id, generation, trans_id)) + seen.add(doc_id) + if changes: + cur_gen = changes[0][1] # max generation + newest_trans_id = changes[0][2] + changes.reverse() + else: + c.execute("SELECT generation, transaction_id" + " FROM transaction_log ORDER BY generation DESC LIMIT 1") + results = c.fetchone() + if not results: + cur_gen = 0 + newest_trans_id = '' + else: + cur_gen, newest_trans_id = results + + return cur_gen, newest_trans_id, changes + + def delete_doc(self, doc): + with self._db_handle: + old_doc = self._get_doc(doc.doc_id, check_for_conflicts=True) + if old_doc is None: + raise errors.DocumentDoesNotExist + if old_doc.rev != doc.rev: + raise errors.RevisionConflict() + if old_doc.is_tombstone(): + raise errors.DocumentAlreadyDeleted + if old_doc.has_conflicts: + raise errors.ConflictedDoc() + new_rev = self._allocate_doc_rev(doc.rev) + doc.rev = new_rev + doc.make_tombstone() + self._put_and_update_indexes(old_doc, doc) + return new_rev + + def _get_conflicts(self, doc_id): + c = self._db_handle.cursor() + c.execute("SELECT doc_rev, content FROM conflicts WHERE doc_id = ?", + (doc_id,)) + return [self._factory(doc_id, doc_rev, content) + for doc_rev, content in c.fetchall()] + + def get_doc_conflicts(self, doc_id): + with self._db_handle: + conflict_docs = self._get_conflicts(doc_id) + if not conflict_docs: + return [] + this_doc = self._get_doc(doc_id) + this_doc.has_conflicts = True + return [this_doc] + conflict_docs + + def _get_replica_gen_and_trans_id(self, other_replica_uid): + c = self._db_handle.cursor() + c.execute("SELECT known_generation, known_transaction_id FROM sync_log" + " WHERE replica_uid = ?", + (other_replica_uid,)) + val = c.fetchone() + if val is None: + other_gen = 0 + trans_id = '' + else: + other_gen = val[0] + trans_id = val[1] + return other_gen, trans_id + + def _set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, other_transaction_id): + with self._db_handle: + self._do_set_replica_gen_and_trans_id( + other_replica_uid, other_generation, other_transaction_id) + + def _do_set_replica_gen_and_trans_id(self, other_replica_uid, + other_generation, + other_transaction_id): + c = self._db_handle.cursor() + c.execute("INSERT OR REPLACE INTO sync_log VALUES (?, ?, ?)", + (other_replica_uid, other_generation, + other_transaction_id)) + + def _put_doc_if_newer(self, doc, save_conflict, replica_uid=None, + replica_gen=None, replica_trans_id=None): + return super(SQLiteDatabase, self)._put_doc_if_newer( + doc, + save_conflict=save_conflict, + replica_uid=replica_uid, replica_gen=replica_gen, + replica_trans_id=replica_trans_id) + + def _add_conflict(self, c, doc_id, my_doc_rev, my_content): + c.execute("INSERT INTO conflicts VALUES (?, ?, ?)", + (doc_id, my_doc_rev, my_content)) + + def _delete_conflicts(self, c, doc, conflict_revs): + deleting = [(doc.doc_id, c_rev) for c_rev in conflict_revs] + c.executemany("DELETE FROM conflicts" + " WHERE doc_id=? AND doc_rev=?", deleting) + doc.has_conflicts = self._has_conflicts(doc.doc_id) + + def _prune_conflicts(self, doc, doc_vcr): + if self._has_conflicts(doc.doc_id): + autoresolved = False + c_revs_to_prune = [] + for c_doc in self._get_conflicts(doc.doc_id): + c_vcr = vectorclock.VectorClockRev(c_doc.rev) + if doc_vcr.is_newer(c_vcr): + c_revs_to_prune.append(c_doc.rev) + elif doc.same_content_as(c_doc): + c_revs_to_prune.append(c_doc.rev) + doc_vcr.maximize(c_vcr) + autoresolved = True + if autoresolved: + doc_vcr.increment(self._replica_uid) + doc.rev = doc_vcr.as_str() + c = self._db_handle.cursor() + self._delete_conflicts(c, doc, c_revs_to_prune) + + def _force_doc_sync_conflict(self, doc): + my_doc = self._get_doc(doc.doc_id) + c = self._db_handle.cursor() + self._prune_conflicts(doc, vectorclock.VectorClockRev(doc.rev)) + self._add_conflict(c, doc.doc_id, my_doc.rev, my_doc.get_json()) + doc.has_conflicts = True + self._put_and_update_indexes(my_doc, doc) + + def resolve_doc(self, doc, conflicted_doc_revs): + with self._db_handle: + cur_doc = self._get_doc(doc.doc_id) + # TODO: https://bugs.launchpad.net/u1db/+bug/928274 + # I think we have a logic bug in resolve_doc + # Specifically, cur_doc.rev is always in the final vector + # clock of revisions that we supersede, even if it wasn't in + # conflicted_doc_revs. We still add it as a conflict, but the + # fact that _put_doc_if_newer propagates resolutions means I + # think that conflict could accidentally be resolved. We need + # to add a test for this case first. (create a rev, create a + # conflict, create another conflict, resolve the first rev + # and first conflict, then make sure that the resolved + # rev doesn't supersede the second conflict rev.) It *might* + # not matter, because the superseding rev is in as a + # conflict, but it does seem incorrect + new_rev = self._ensure_maximal_rev(cur_doc.rev, + conflicted_doc_revs) + superseded_revs = set(conflicted_doc_revs) + c = self._db_handle.cursor() + doc.rev = new_rev + if cur_doc.rev in superseded_revs: + self._put_and_update_indexes(cur_doc, doc) + else: + self._add_conflict(c, doc.doc_id, new_rev, doc.get_json()) + # TODO: Is there some way that we could construct a rev that would + # end up in superseded_revs, such that we add a conflict, and + # then immediately delete it? + self._delete_conflicts(c, doc, superseded_revs) + + def list_indexes(self): + """Return the list of indexes and their definitions.""" + c = self._db_handle.cursor() + # TODO: How do we test the ordering? + c.execute("SELECT name, field FROM index_definitions" + " ORDER BY name, offset") + definitions = [] + cur_name = None + for name, field in c.fetchall(): + if cur_name != name: + definitions.append((name, [])) + cur_name = name + definitions[-1][-1].append(field) + return definitions + + def _get_index_definition(self, index_name): + """Return the stored definition for a given index_name.""" + c = self._db_handle.cursor() + c.execute("SELECT field FROM index_definitions" + " WHERE name = ? ORDER BY offset", (index_name,)) + fields = [x[0] for x in c.fetchall()] + if not fields: + raise errors.IndexDoesNotExist + return fields + + @staticmethod + def _strip_glob(value): + """Remove the trailing * from a value.""" + assert value[-1] == '*' + return value[:-1] + + def _format_query(self, definition, key_values): + # First, build the definition. We join the document_fields table + # against itself, as many times as the 'width' of our definition. + # We then do a query for each key_value, one-at-a-time. + # Note: All of these strings are static, we could cache them, etc. + tables = ["document_fields d%d" % i for i in range(len(definition))] + novalue_where = ["d.doc_id = d%d.doc_id" + " AND d%d.field_name = ?" + % (i, i) for i in range(len(definition))] + wildcard_where = [novalue_where[i] + + (" AND d%d.value NOT NULL" % (i,)) + for i in range(len(definition))] + exact_where = [novalue_where[i] + + (" AND d%d.value = ?" % (i,)) + for i in range(len(definition))] + like_where = [novalue_where[i] + + (" AND d%d.value GLOB ?" % (i,)) + for i in range(len(definition))] + is_wildcard = False + # Merge the lists together, so that: + # [field1, field2, field3], [val1, val2, val3] + # Becomes: + # (field1, val1, field2, val2, field3, val3) + args = [] + where = [] + for idx, (field, value) in enumerate(zip(definition, key_values)): + args.append(field) + if value.endswith('*'): + if value == '*': + where.append(wildcard_where[idx]) + else: + # This is a glob match + if is_wildcard: + # We can't have a partial wildcard following + # another wildcard + raise errors.InvalidGlobbing + where.append(like_where[idx]) + args.append(value) + is_wildcard = True + else: + if is_wildcard: + raise errors.InvalidGlobbing + where.append(exact_where[idx]) + args.append(value) + statement = ( + "SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM " + "document d, %s LEFT OUTER JOIN conflicts c ON c.doc_id = " + "d.doc_id WHERE %s GROUP BY d.doc_id, d.doc_rev, d.content ORDER " + "BY %s;" % (', '.join(tables), ' AND '.join(where), ', '.join( + ['d%d.value' % i for i in range(len(definition))]))) + return statement, args + + def get_from_index(self, index_name, *key_values): + definition = self._get_index_definition(index_name) + if len(key_values) != len(definition): + raise errors.InvalidValueForIndex() + statement, args = self._format_query(definition, key_values) + c = self._db_handle.cursor() + try: + c.execute(statement, tuple(args)) + except dbapi2.OperationalError as e: + raise dbapi2.OperationalError( + str(e) + + '\nstatement: %s\nargs: %s\n' % (statement, args)) + res = c.fetchall() + results = [] + for row in res: + doc = self._factory(row[0], row[1], row[2]) + doc.has_conflicts = row[3] > 0 + results.append(doc) + return results + + def _format_range_query(self, definition, start_value, end_value): + tables = ["document_fields d%d" % i for i in range(len(definition))] + novalue_where = [ + "d.doc_id = d%d.doc_id AND d%d.field_name = ?" % (i, i) for i in + range(len(definition))] + wildcard_where = [ + novalue_where[i] + (" AND d%d.value NOT NULL" % (i,)) for i in + range(len(definition))] + like_where = [ + novalue_where[i] + ( + " AND (d%d.value < ? OR d%d.value GLOB ?)" % (i, i)) for i in + range(len(definition))] + range_where_lower = [ + novalue_where[i] + (" AND d%d.value >= ?" % (i,)) for i in + range(len(definition))] + range_where_upper = [ + novalue_where[i] + (" AND d%d.value <= ?" % (i,)) for i in + range(len(definition))] + args = [] + where = [] + if start_value: + if isinstance(start_value, basestring): + start_value = (start_value,) + if len(start_value) != len(definition): + raise errors.InvalidValueForIndex() + is_wildcard = False + for idx, (field, value) in enumerate(zip(definition, start_value)): + args.append(field) + if value.endswith('*'): + if value == '*': + where.append(wildcard_where[idx]) + else: + # This is a glob match + if is_wildcard: + # We can't have a partial wildcard following + # another wildcard + raise errors.InvalidGlobbing + where.append(range_where_lower[idx]) + args.append(self._strip_glob(value)) + is_wildcard = True + else: + if is_wildcard: + raise errors.InvalidGlobbing + where.append(range_where_lower[idx]) + args.append(value) + if end_value: + if isinstance(end_value, basestring): + end_value = (end_value,) + if len(end_value) != len(definition): + raise errors.InvalidValueForIndex() + is_wildcard = False + for idx, (field, value) in enumerate(zip(definition, end_value)): + args.append(field) + if value.endswith('*'): + if value == '*': + where.append(wildcard_where[idx]) + else: + # This is a glob match + if is_wildcard: + # We can't have a partial wildcard following + # another wildcard + raise errors.InvalidGlobbing + where.append(like_where[idx]) + args.append(self._strip_glob(value)) + args.append(value) + is_wildcard = True + else: + if is_wildcard: + raise errors.InvalidGlobbing + where.append(range_where_upper[idx]) + args.append(value) + statement = ( + "SELECT d.doc_id, d.doc_rev, d.content, count(c.doc_rev) FROM " + "document d, %s LEFT OUTER JOIN conflicts c ON c.doc_id = " + "d.doc_id WHERE %s GROUP BY d.doc_id, d.doc_rev, d.content ORDER " + "BY %s;" % (', '.join(tables), ' AND '.join(where), ', '.join( + ['d%d.value' % i for i in range(len(definition))]))) + return statement, args + + def get_range_from_index(self, index_name, start_value=None, + end_value=None): + """Return all documents with key values in the specified range.""" + definition = self._get_index_definition(index_name) + statement, args = self._format_range_query( + definition, start_value, end_value) + c = self._db_handle.cursor() + try: + c.execute(statement, tuple(args)) + except dbapi2.OperationalError as e: + raise dbapi2.OperationalError( + str(e) + + '\nstatement: %s\nargs: %s\n' % (statement, args)) + res = c.fetchall() + results = [] + for row in res: + doc = self._factory(row[0], row[1], row[2]) + doc.has_conflicts = row[3] > 0 + results.append(doc) + return results + + def get_index_keys(self, index_name): + c = self._db_handle.cursor() + definition = self._get_index_definition(index_name) + value_fields = ', '.join([ + 'd%d.value' % i for i in range(len(definition))]) + tables = ["document_fields d%d" % i for i in range(len(definition))] + novalue_where = [ + "d.doc_id = d%d.doc_id AND d%d.field_name = ?" % (i, i) for i in + range(len(definition))] + where = [ + novalue_where[i] + (" AND d%d.value NOT NULL" % (i,)) for i in + range(len(definition))] + statement = ( + "SELECT %s FROM document d, %s WHERE %s GROUP BY %s;" % ( + value_fields, ', '.join(tables), ' AND '.join(where), + value_fields)) + try: + c.execute(statement, tuple(definition)) + except dbapi2.OperationalError as e: + raise dbapi2.OperationalError( + str(e) + + '\nstatement: %s\nargs: %s\n' % (statement, tuple(definition))) + return c.fetchall() + + def delete_index(self, index_name): + with self._db_handle: + c = self._db_handle.cursor() + c.execute("DELETE FROM index_definitions WHERE name = ?", + (index_name,)) + c.execute( + "DELETE FROM document_fields WHERE document_fields.field_name " + " NOT IN (SELECT field from index_definitions)") + + +class SQLiteSyncTarget(CommonSyncTarget): + + def get_sync_info(self, source_replica_uid): + source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id( + source_replica_uid) + my_gen, my_trans_id = self._db._get_generation_info() + return ( + self._db._replica_uid, my_gen, my_trans_id, source_gen, + source_trans_id) + + def record_sync_info(self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + if self._trace_hook: + self._trace_hook('record_sync_info') + self._db._set_replica_gen_and_trans_id( + source_replica_uid, source_replica_generation, + source_replica_transaction_id) + + +class SQLitePartialExpandDatabase(SQLiteDatabase): + """An SQLite Backend that expands documents into a document_field table. + + It stores the original document text in document.doc. For fields that are + indexed, the data goes into document_fields. + """ + + _index_storage_value = 'expand referenced' + + def _get_indexed_fields(self): + """Determine what fields are indexed.""" + c = self._db_handle.cursor() + c.execute("SELECT field FROM index_definitions") + return set([x[0] for x in c.fetchall()]) + + def _evaluate_index(self, raw_doc, field): + parser = query_parser.Parser() + getter = parser.parse(field) + return getter.get(raw_doc) + + def _put_and_update_indexes(self, old_doc, doc): + c = self._db_handle.cursor() + if doc and not doc.is_tombstone(): + raw_doc = json.loads(doc.get_json()) + else: + raw_doc = {} + if old_doc is not None: + c.execute("UPDATE document SET doc_rev=?, content=?" + " WHERE doc_id = ?", + (doc.rev, doc.get_json(), doc.doc_id)) + c.execute("DELETE FROM document_fields WHERE doc_id = ?", + (doc.doc_id,)) + else: + c.execute("INSERT INTO document (doc_id, doc_rev, content)" + " VALUES (?, ?, ?)", + (doc.doc_id, doc.rev, doc.get_json())) + indexed_fields = self._get_indexed_fields() + if indexed_fields: + # It is expected that len(indexed_fields) is shorter than + # len(raw_doc) + getters = [(field, self._parse_index_definition(field)) + for field in indexed_fields] + self._update_indexes(doc.doc_id, raw_doc, getters, c) + trans_id = self._allocate_transaction_id() + c.execute("INSERT INTO transaction_log(doc_id, transaction_id)" + " VALUES (?, ?)", (doc.doc_id, trans_id)) + + def create_index(self, index_name, *index_expressions): + with self._db_handle: + c = self._db_handle.cursor() + cur_fields = self._get_indexed_fields() + definition = [(index_name, idx, field) + for idx, field in enumerate(index_expressions)] + try: + c.executemany("INSERT INTO index_definitions VALUES (?, ?, ?)", + definition) + except dbapi2.IntegrityError as e: + stored_def = self._get_index_definition(index_name) + if stored_def == [x[-1] for x in definition]: + return + raise errors.IndexNameTakenError( + str(e) + + str(sys.exc_info()[2]) + ) + new_fields = set( + [f for f in index_expressions if f not in cur_fields]) + if new_fields: + self._update_all_indexes(new_fields) + + def _iter_all_docs(self): + c = self._db_handle.cursor() + c.execute("SELECT doc_id, content FROM document") + while True: + next_rows = c.fetchmany() + if not next_rows: + break + for row in next_rows: + yield row + + def _update_all_indexes(self, new_fields): + """Iterate all the documents, and add content to document_fields. + + :param new_fields: The index definitions that need to be added. + """ + getters = [(field, self._parse_index_definition(field)) + for field in new_fields] + c = self._db_handle.cursor() + for doc_id, doc in self._iter_all_docs(): + if doc is None: + continue + raw_doc = json.loads(doc) + self._update_indexes(doc_id, raw_doc, getters, c) + + +SQLiteDatabase.register_implementation(SQLitePartialExpandDatabase) diff --git a/src/leap/soledad/client/_document.py b/src/leap/soledad/client/_document.py new file mode 100644 index 00000000..9c8577cb --- /dev/null +++ b/src/leap/soledad/client/_document.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# _document.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Public interfaces for adding extra client features to the generic +SoledadDocument. +""" + +import weakref +import uuid + +from twisted.internet import defer + +from zope.interface import Interface +from zope.interface import implementer + +from leap.soledad.common.document import SoledadDocument + + +class IDocumentWithAttachment(Interface): + """ + A document that can have an attachment. + """ + + def set_store(self, store): + """ + Set the store used by this file to manage attachments. + + :param store: The store used to manage attachments. + :type store: Soledad + """ + + def put_attachment(self, fd): + """ + Attach data to this document. + + Add the attachment to local storage, enqueue for upload. + + The document content will be updated with a pointer to the attachment, + but the document has to be manually put in the database to reflect + modifications. + + :param fd: A file-like object whose content will be attached to this + document. + :type fd: file-like + + :return: A deferred which fires when the attachment has been added to + local storage. + :rtype: Deferred + """ + + def get_attachment(self): + """ + Return the data attached to this document. + + If document content contains a pointer to the attachment, try to get + the attachment from local storage and, if not found, from remote + storage. + + :return: A deferred which fires with a file like-object whose content + is the attachment of this document, or None if nothing is + attached. + :rtype: Deferred + """ + + def delete_attachment(self): + """ + Delete the attachment of this document. + + The pointer to the attachment will be removed from the document + content, but the document has to be manually put in the database to + reflect modifications. + + :return: A deferred which fires when the attachment has been deleted + from local storage. + :rtype: Deferred + """ + + def get_attachment_state(self): + """ + Return the state of the attachment of this document. + + The state is a member of AttachmentStates and is of one of NONE, + LOCAL, REMOTE or SYNCED. + + :return: A deferred which fires with The state of the attachment of + this document. + :rtype: Deferred + """ + + def is_dirty(self): + """ + Return whether this document's content differs from the contents stored + in local database. + + :return: A deferred which fires with True or False, depending on + whether this document is dirty or not. + :rtype: Deferred + """ + + def upload_attachment(self): + """ + Upload this document's attachment. + + :return: A deferred which fires with the state of the attachment after + it's been uploaded, or NONE if there's no attachment for this + document. + :rtype: Deferred + """ + + def download_attachment(self): + """ + Download this document's attachment. + + :return: A deferred which fires with the state of the attachment after + it's been downloaded, or NONE if there's no attachment for + this document. + :rtype: Deferred + """ + + +class BlobDoc(object): + + # TODO probably not needed, but convenient for testing for now. + + def __init__(self, content, blob_id): + + self.blob_id = blob_id + self.is_blob = True + self.blob_fd = content + if blob_id is None: + blob_id = uuid.uuid4().get_hex() + self.blob_id = blob_id + + +class AttachmentStates(object): + NONE = 0 + LOCAL = 1 + REMOTE = 2 + SYNCED = 4 + + +@implementer(IDocumentWithAttachment) +class Document(SoledadDocument): + + def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True, store=None): + SoledadDocument.__init__(self, doc_id=doc_id, rev=rev, json=json, + has_conflicts=has_conflicts, + syncable=syncable) + self.set_store(store) + + # + # properties + # + + @property + def _manager(self): + if not self.store or not hasattr(self.store, 'blobmanager'): + raise Exception('No blob manager found to manage attachments.') + return self.store.blobmanager + + @property + def _blob_id(self): + if self.content and 'blob_id' in self.content: + return self.content['blob_id'] + return None + + def get_store(self): + return self._store() if self._store else None + + def set_store(self, store): + self._store = weakref.ref(store) if store else None + + store = property(get_store, set_store) + + # + # attachment api + # + + def put_attachment(self, fd): + # add pointer to content + blob_id = self._blob_id or str(uuid.uuid4()) + if not self.content: + self.content = {} + self.content['blob_id'] = blob_id + # put using manager + blob = BlobDoc(fd, blob_id) + fd.seek(0, 2) + size = fd.tell() + fd.seek(0) + return self._manager.put(blob, size) + + def get_attachment(self): + if not self._blob_id: + return defer.succeed(None) + return self._manager.get(self._blob_id) + + def delete_attachment(self): + raise NotImplementedError + + @defer.inlineCallbacks + def get_attachment_state(self): + state = AttachmentStates.NONE + + if not self._blob_id: + defer.returnValue(state) + + local_list = yield self._manager.local_list() + if self._blob_id in local_list: + state |= AttachmentStates.LOCAL + + remote_list = yield self._manager.remote_list() + if self._blob_id in remote_list: + state |= AttachmentStates.REMOTE + + defer.returnValue(state) + + @defer.inlineCallbacks + def is_dirty(self): + stored = yield self.store.get_doc(self.doc_id) + if stored.content != self.content: + defer.returnValue(True) + defer.returnValue(False) + + @defer.inlineCallbacks + def upload_attachment(self): + if not self._blob_id: + defer.returnValue(AttachmentStates.NONE) + + fd = yield self._manager.get_blob(self._blob_id) + # TODO: turn following method into a public one + yield self._manager._encrypt_and_upload(self._blob_id, fd) + defer.returnValue(self.get_attachment_state()) + + @defer.inlineCallbacks + def download_attachment(self): + if not self._blob_id: + defer.returnValue(None) + yield self.get_attachment() + defer.returnValue(self.get_attachment_state()) diff --git a/src/leap/soledad/client/_http.py b/src/leap/soledad/client/_http.py new file mode 100644 index 00000000..2a6b9e39 --- /dev/null +++ b/src/leap/soledad/client/_http.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# _http.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A twisted-based, TLS-pinned, token-authenticated HTTP client. +""" +import base64 + +from twisted.internet import reactor +from twisted.web.iweb import IAgent +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from treq.client import HTTPClient as _HTTPClient + +from zope.interface import implementer + +from leap.common.certs import get_compatible_ssl_context_factory + + +__all__ = ['HTTPClient', 'PinnedTokenAgent'] + + +class HTTPClient(_HTTPClient): + + def __init__(self, uuid, token, cert_file): + self._agent = PinnedTokenAgent(uuid, token, cert_file) + super(self.__class__, self).__init__(self._agent) + + def set_token(self, token): + self._agent.set_token(token) + + +@implementer(IAgent) +class PinnedTokenAgent(Agent): + + def __init__(self, uuid, token, cert_file): + self._uuid = uuid + self._token = None + self._creds = None + self.set_token(token) + # pin this agent with the platform TLS certificate + factory = get_compatible_ssl_context_factory(cert_file) + Agent.__init__(self, reactor, contextFactory=factory) + + def set_token(self, token): + self._token = token + self._creds = self._encoded_creds() + + def _encoded_creds(self): + creds = '%s:%s' % (self._uuid, self._token) + encoded = base64.b64encode(creds) + return 'Token %s' % encoded + + def request(self, method, uri, headers=None, bodyProducer=None): + # authenticate the request + headers = headers or Headers() + headers.addRawHeader('Authorization', self._creds) + # perform the authenticated request + return Agent.request( + self, method, uri, headers=headers, bodyProducer=bodyProducer) diff --git a/src/leap/soledad/client/_pipes.py b/src/leap/soledad/client/_pipes.py new file mode 100644 index 00000000..eef3f1f9 --- /dev/null +++ b/src/leap/soledad/client/_pipes.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# _pipes.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Components for piping data on streams. +""" +from io import BytesIO + + +__all__ = ['TruncatedTailPipe', 'PreamblePipe'] + + +class TruncatedTailPipe(object): + """ + Truncate the last `tail_size` bytes from the stream. + """ + + def __init__(self, output=None, tail_size=16): + self.tail_size = tail_size + self.output = output or BytesIO() + self.buffer = BytesIO() + + def write(self, data): + self.buffer.write(data) + if self.buffer.tell() > self.tail_size: + self._truncate_tail() + + def _truncate_tail(self): + overflow_size = self.buffer.tell() - self.tail_size + self.buffer.seek(0) + self.output.write(self.buffer.read(overflow_size)) + remaining = self.buffer.read() + self.buffer.seek(0) + self.buffer.write(remaining) + self.buffer.truncate() + + def close(self): + return self.output + + +class PreamblePipe(object): + """ + Consumes data until a space is found, then calls a callback with it and + starts forwarding data to consumer returned by this callback. + """ + + def __init__(self, callback): + self.callback = callback + self.preamble = BytesIO() + self.output = None + + def write(self, data): + if not self.output: + self._write_preamble(data) + else: + self.output.write(data) + + def _write_preamble(self, data): + if ' ' not in data: + self.preamble.write(data) + return + preamble_chunk, remaining = data.split(' ', 1) + self.preamble.write(preamble_chunk) + self.output = self.callback(self.preamble) + self.output.write(remaining) diff --git a/src/leap/soledad/client/_recovery_code.py b/src/leap/soledad/client/_recovery_code.py new file mode 100644 index 00000000..04235a29 --- /dev/null +++ b/src/leap/soledad/client/_recovery_code.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# _recovery_code.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import binascii + +from leap.soledad.common.log import getLogger + +logger = getLogger(__name__) + + +class RecoveryCode(object): + + # When we turn this string to hex, it will double in size + code_length = 6 + + def generate(self): + logger.info("generating new recovery code...") + return binascii.hexlify(os.urandom(self.code_length)) diff --git a/src/leap/soledad/client/_secrets/__init__.py b/src/leap/soledad/client/_secrets/__init__.py new file mode 100644 index 00000000..b6c81cda --- /dev/null +++ b/src/leap/soledad/client/_secrets/__init__.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# _secrets/__init__.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import scrypt + +from leap.soledad.common.log import getLogger + +from leap.soledad.client._secrets.storage import SecretsStorage +from leap.soledad.client._secrets.crypto import SecretsCrypto +from leap.soledad.client._secrets.util import emit, UserDataMixin + + +logger = getLogger(__name__) + + +class Secrets(UserDataMixin): + + lengths = { + 'remote_secret': 512, # remote_secret is used to encrypt remote data. + 'local_salt': 64, # local_salt is used in conjunction with + 'local_secret': 448, # local_secret to derive a local_key for storage + } + + def __init__(self, soledad): + self._soledad = soledad + self._secrets = {} + self.crypto = SecretsCrypto(soledad) + self.storage = SecretsStorage(soledad) + self._bootstrap() + + # + # bootstrap + # + + def _bootstrap(self): + + # attempt to load secrets from local storage + encrypted = self.storage.load_local() + if encrypted: + self._secrets = self.crypto.decrypt(encrypted) + # maybe update the format of storage of local secret. + if encrypted['version'] < self.crypto.VERSION: + self.store_secrets() + return + + # no secret was found in local storage, so this is a first run of + # soledad for this user in this device. It is mandatory that we check + # if there's a secret stored in server. + encrypted = self.storage.load_remote() + if encrypted: + self._secrets = self.crypto.decrypt(encrypted) + self.store_secrets() + return + + # we have *not* found a secret neither in local nor in remote storage, + # so we have to generate a new one, and then store it. + self._secrets = self._generate() + self.store_secrets() + + # + # generation + # + + @emit('creating') + def _generate(self): + logger.info("generating new set of secrets...") + secrets = {} + for name, length in self.lengths.iteritems(): + secret = os.urandom(length) + secrets[name] = secret + logger.info("new set of secrets successfully generated") + return secrets + + # + # crypto + # + + def store_secrets(self): + # TODO: we have to improve the logic here, as we want to make sure that + # whatever is stored locally should only be used after remote storage + # is successful. Otherwise, this soledad could start encrypting with a + # secret while another soledad in another device could start encrypting + # with another secret, which would lead to decryption failures during + # sync. + encrypted = self.crypto.encrypt(self._secrets) + self.storage.save_local(encrypted) + self.storage.save_remote(encrypted) + + # + # secrets + # + + @property + def remote_secret(self): + return self._secrets.get('remote_secret') + + @property + def local_salt(self): + return self._secrets.get('local_salt') + + @property + def local_secret(self): + return self._secrets.get('local_secret') + + @property + def local_key(self): + # local storage key is scrypt-derived from `local_secret` and + # `local_salt` above + secret = scrypt.hash( + password=self.local_secret, + salt=self.local_salt, + buflen=32, # we need a key with 256 bits (32 bytes) + ) + return secret diff --git a/src/leap/soledad/client/_secrets/crypto.py b/src/leap/soledad/client/_secrets/crypto.py new file mode 100644 index 00000000..8148151d --- /dev/null +++ b/src/leap/soledad/client/_secrets/crypto.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# _secrets/crypto.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import binascii +import json +import os +import scrypt + +from leap.soledad.common import soledad_assert +from leap.soledad.common.log import getLogger + +from leap.soledad.client._crypto import encrypt_sym, decrypt_sym, ENC_METHOD +from leap.soledad.client._secrets.util import SecretsError + + +logger = getLogger(__name__) + + +class SecretsCrypto(object): + + VERSION = 2 + + def __init__(self, soledad): + self._soledad = soledad + + def _get_key(self, salt): + passphrase = self._soledad.passphrase.encode('utf8') + key = scrypt.hash(passphrase, salt, buflen=32) + return key + + # + # encryption + # + + def encrypt(self, secrets): + encoded = {} + for name, value in secrets.iteritems(): + encoded[name] = binascii.b2a_base64(value) + plaintext = json.dumps(encoded) + salt = os.urandom(64) # TODO: get salt length from somewhere else + key = self._get_key(salt) + iv, ciphertext = encrypt_sym(plaintext, key, + method=ENC_METHOD.aes_256_gcm) + encrypted = { + 'version': self.VERSION, + 'kdf': 'scrypt', + 'kdf_salt': binascii.b2a_base64(salt), + 'kdf_length': len(key), + 'cipher': ENC_METHOD.aes_256_gcm, + 'length': len(plaintext), + 'iv': str(iv), + 'secrets': binascii.b2a_base64(ciphertext), + } + return encrypted + + # + # decryption + # + + def decrypt(self, data): + version = data.setdefault('version', 1) + method = getattr(self, '_decrypt_v%d' % version) + try: + return method(data) + except Exception as e: + logger.error('error decrypting secrets: %r' % e) + raise SecretsError(e) + + def _decrypt_v1(self, data): + # get encrypted secret from dictionary: the old format allowed for + # storage of more than one secret, but this feature was never used and + # soledad has been using only one secret so far. As there is a corner + # case where the old 'active_secret' key might not be set, we just + # ignore it and pop the only secret found in the 'storage_secrets' key. + secret_id = data['storage_secrets'].keys().pop() + encrypted = data['storage_secrets'][secret_id] + + # assert that we know how to decrypt the secret + soledad_assert('cipher' in encrypted) + cipher = encrypted['cipher'] + if cipher == 'aes256': + cipher = ENC_METHOD.aes_256_ctr + soledad_assert(cipher in ENC_METHOD) + + # decrypt + salt = binascii.a2b_base64(encrypted['kdf_salt']) + key = self._get_key(salt) + separator = ':' + iv, ciphertext = encrypted['secret'].split(separator, 1) + ciphertext = binascii.a2b_base64(ciphertext) + plaintext = self._decrypt(key, iv, ciphertext, encrypted, cipher) + + # create secrets dictionary + secrets = { + 'remote_secret': plaintext[0:512], + 'local_salt': plaintext[512:576], + 'local_secret': plaintext[576:1024], + } + return secrets + + def _decrypt_v2(self, encrypted): + cipher = encrypted['cipher'] + soledad_assert(cipher in ENC_METHOD) + + salt = binascii.a2b_base64(encrypted['kdf_salt']) + key = self._get_key(salt) + iv = encrypted['iv'] + ciphertext = binascii.a2b_base64(encrypted['secrets']) + plaintext = self._decrypt( + key, iv, ciphertext, encrypted, cipher) + encoded = json.loads(plaintext) + secrets = {} + for name, value in encoded.iteritems(): + secrets[name] = binascii.a2b_base64(value) + return secrets + + def _decrypt(self, key, iv, ciphertext, encrypted, method): + # assert some properties of the stored secret + soledad_assert(encrypted['kdf'] == 'scrypt') + soledad_assert(encrypted['kdf_length'] == len(key)) + # decrypt + plaintext = decrypt_sym(ciphertext, key, iv, method) + soledad_assert(encrypted['length'] == len(plaintext)) + return plaintext diff --git a/src/leap/soledad/client/_secrets/storage.py b/src/leap/soledad/client/_secrets/storage.py new file mode 100644 index 00000000..85713a48 --- /dev/null +++ b/src/leap/soledad/client/_secrets/storage.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# _secrets/storage.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import six.moves.urllib.parse as urlparse + +from hashlib import sha256 + +from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common.log import getLogger + +from leap.soledad.client.shared_db import SoledadSharedDatabase +from leap.soledad.client._document import Document +from leap.soledad.client._secrets.util import emit, UserDataMixin + + +logger = getLogger(__name__) + + +class SecretsStorage(UserDataMixin): + + def __init__(self, soledad): + self._soledad = soledad + self._shared_db = self._soledad.shared_db or self._init_shared_db() + self.__remote_doc = None + + @property + def _creds(self): + uuid = self._soledad.uuid + token = self._soledad.token + return {'token': {'uuid': uuid, 'token': token}} + + # + # local storage + # + + def load_local(self): + path = self._soledad.secrets_path + logger.info("trying to load secrets from disk: %s" % path) + try: + with open(path, 'r') as f: + encrypted = json.loads(f.read()) + logger.info("secrets loaded successfully from disk") + return encrypted + except IOError: + logger.warn("secrets not found in disk") + return None + + def save_local(self, encrypted): + path = self._soledad.secrets_path + json_data = json.dumps(encrypted) + with open(path, 'w') as f: + f.write(json_data) + + # + # remote storage + # + + def _init_shared_db(self): + url = urlparse.urljoin(self._soledad.server_url, SHARED_DB_NAME) + creds = self._creds + db = SoledadSharedDatabase.open_database(url, creds) + return db + + def _remote_doc_id(self): + passphrase = self._soledad.passphrase.encode('utf8') + uuid = self._soledad.uuid + text = '%s%s' % (passphrase, uuid) + digest = sha256(text).hexdigest() + return digest + + @property + def _remote_doc(self): + if not self.__remote_doc and self._shared_db: + doc = self._get_remote_doc() + self.__remote_doc = doc + return self.__remote_doc + + @emit('downloading') + def _get_remote_doc(self): + logger.info('trying to load secrets from server...') + doc = self._shared_db.get_doc(self._remote_doc_id()) + if doc: + logger.info('secrets loaded successfully from server') + else: + logger.warn('secrets not found in server') + return doc + + def load_remote(self): + doc = self._remote_doc + if not doc: + return None + encrypted = doc.content + return encrypted + + @emit('uploading') + def save_remote(self, encrypted): + doc = self._remote_doc + if not doc: + doc = Document(doc_id=self._remote_doc_id()) + doc.content = encrypted + db = self._shared_db + if not db: + logger.warn('no shared db found') + return + db.put_doc(doc) diff --git a/src/leap/soledad/client/_secrets/util.py b/src/leap/soledad/client/_secrets/util.py new file mode 100644 index 00000000..6401889b --- /dev/null +++ b/src/leap/soledad/client/_secrets/util.py @@ -0,0 +1,63 @@ +# -*- coding:utf-8 -*- +# _secrets/util.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +from leap.soledad.client import events + + +class SecretsError(Exception): + pass + + +class UserDataMixin(object): + """ + When emitting an event, we have to pass a dictionary containing user data. + This class only defines a property so we don't have to define it in + multiple places. + """ + + @property + def _user_data(self): + uuid = self._soledad.uuid + userid = self._soledad.userid + # TODO: seems that uuid and userid hold the same value! We should check + # whether we should pass something different or if the events api + # really needs two different values. + return {'uuid': uuid, 'userid': userid} + + +def emit(verb): + def _decorator(method): + def _decorated(self, *args, **kwargs): + + # emit starting event + user_data = self._user_data + name = 'SOLEDAD_' + verb.upper() + '_KEYS' + event = getattr(events, name) + events.emit_async(event, user_data) + + # run the method + result = method(self, *args, **kwargs) + + # emit a finished event + name = 'SOLEDAD_DONE_' + verb.upper() + '_KEYS' + event = getattr(events, name) + events.emit_async(event, user_data) + + return result + return _decorated + return _decorator diff --git a/src/leap/soledad/client/api.py b/src/leap/soledad/client/api.py new file mode 100644 index 00000000..c62b43f0 --- /dev/null +++ b/src/leap/soledad/client/api.py @@ -0,0 +1,848 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad - Synchronization Of Locally Encrypted Data Among Devices. + +This module holds the public api for Soledad. + +Soledad is the part of LEAP that manages storage and synchronization of +application data. It is built on top of U1DB reference Python API and +implements (1) a SQLCipher backend for local storage in the client, (2) a +SyncTarget that encrypts data before syncing, and (3) a CouchDB backend for +remote storage in the server side. +""" +import binascii +import errno +import os +import socket +import ssl +import uuid + +from itertools import chain +import six.moves.http_client as httplib +import six.moves.urllib.parse as urlparse +from six import StringIO +from collections import defaultdict + +from twisted.internet import defer +from zope.interface import implementer + +from leap.common.config import get_path_prefix +from leap.common.plugins import collect_plugins + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common.log import getLogger +from leap.soledad.common.l2db.remote import http_client +from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname +from leap.soledad.common.errors import DatabaseAccessError + +from . import events as soledad_events +from . import interfaces as soledad_interfaces +from ._crypto import SoledadCrypto +from ._db import adbapi +from ._db import blobs +from ._db import sqlcipher +from ._recovery_code import RecoveryCode +from ._secrets import Secrets + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +# +# Constants +# + +""" +Path to the certificate file used to certify the SSL connection between +Soledad client and server. +""" +SOLEDAD_CERT = None + + +@implementer(soledad_interfaces.ILocalStorage, + soledad_interfaces.ISyncableStorage, + soledad_interfaces.ISecretsStorage) +class Soledad(object): + """ + Soledad provides encrypted data storage and sync. + + A Soledad instance is used to store and retrieve data in a local encrypted + database and synchronize this database with Soledad server. + + This class is also responsible for bootstrapping users' account by + creating cryptographic secrets and/or storing/fetching them on Soledad + server. + """ + + local_db_file_name = 'soledad.u1db' + secrets_file_name = "soledad.json" + default_prefix = os.path.join(get_path_prefix(), 'leap', 'soledad') + + """ + A dictionary that holds locks which avoid multiple sync attempts from the + same database replica. The dictionary indexes are the paths to each local + db, so we guarantee that only one sync happens for a local db at a time. + """ + _sync_lock = defaultdict(defer.DeferredLock) + + def __init__(self, uuid, passphrase, secrets_path, local_db_path, + server_url, cert_file, shared_db=None, + auth_token=None): + """ + Initialize configuration, cryptographic keys and dbs. + + :param uuid: User's uuid. + :type uuid: str + + :param passphrase: + The passphrase for locking and unlocking encryption secrets for + local and remote storage. + :type passphrase: unicode + + :param secrets_path: + Path for storing encrypted key used for symmetric encryption. + :type secrets_path: str + + :param local_db_path: Path for local encrypted storage db. + :type local_db_path: str + + :param server_url: + URL for Soledad server. This is used either to sync with the user's + remote db and to interact with the shared recovery database. + :type server_url: str + + :param cert_file: + Path to the certificate of the ca used to validate the SSL + certificate used by the remote soledad server. + :type cert_file: str + + :param shared_db: + The shared database. + :type shared_db: HTTPDatabase + + :param auth_token: + Authorization token for accessing remote databases. + :type auth_token: str + + :raise BootstrapSequenceError: + Raised when the secret initialization sequence (i.e. retrieval + from server or generation and storage on server) has failed for + some reason. + """ + # store config params + self.uuid = uuid + self.passphrase = passphrase + self.secrets_path = secrets_path + self._local_db_path = local_db_path + self.server_url = server_url + self.shared_db = shared_db + self.token = auth_token + + self._dbsyncer = None + + # configure SSL certificate + global SOLEDAD_CERT + SOLEDAD_CERT = cert_file + + self._init_config_with_defaults() + self._init_working_dirs() + + self._recovery_code = RecoveryCode() + self._secrets = Secrets(self) + self._crypto = SoledadCrypto(self._secrets.remote_secret) + self._init_blobmanager() + + try: + # initialize database access, trap any problems so we can shutdown + # smoothly. + self._init_u1db_sqlcipher_backend() + self._init_u1db_syncer() + except DatabaseAccessError: + # oops! something went wrong with backend initialization. We + # have to close any thread-related stuff we have already opened + # here, otherwise there might be zombie threads that may clog the + # reactor. + if hasattr(self, '_dbpool'): + self._dbpool.close() + raise + + # + # initialization/destruction methods + # + + def _init_config_with_defaults(self): + """ + Initialize configuration using default values for missing params. + """ + soledad_assert_type(self.passphrase, unicode) + + def initialize(attr, val): + return ((getattr(self, attr, None) is None) and + setattr(self, attr, val)) + + initialize("_secrets_path", os.path.join( + self.default_prefix, self.secrets_file_name)) + initialize("_local_db_path", os.path.join( + self.default_prefix, self.local_db_file_name)) + # initialize server_url + soledad_assert(self.server_url is not None, + 'Missing URL for Soledad server.') + + def _init_working_dirs(self): + """ + Create work directories. + + :raise OSError: in case file exists and is not a dir. + """ + paths = map(lambda x: os.path.dirname(x), [ + self._local_db_path, self._secrets_path]) + for path in paths: + create_path_if_not_exists(path) + + def _init_u1db_sqlcipher_backend(self): + """ + Initialize the U1DB SQLCipher database for local storage. + + Instantiates a modified twisted adbapi that will maintain a threadpool + with a u1db-sqclipher connection for each thread, and will return + deferreds for each u1db query. + + Currently, Soledad uses the default SQLCipher cipher, i.e. + 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key, + and internally the SQLCipherDatabase initialization uses the 'raw + PRAGMA key' format to handle the key to SQLCipher. + """ + tohex = binascii.b2a_hex + # sqlcipher only accepts the hex version + key = tohex(self._secrets.local_key) + + opts = sqlcipher.SQLCipherOptions( + self._local_db_path, key, + is_raw_key=True, create=True) + self._sqlcipher_opts = opts + self._dbpool = adbapi.getConnectionPool(opts) + + def _init_u1db_syncer(self): + """ + Initialize the U1DB synchronizer. + """ + replica_uid = self._dbpool.replica_uid + self._dbsyncer = sqlcipher.SQLCipherU1DBSync( + self._sqlcipher_opts, self._crypto, replica_uid, + SOLEDAD_CERT) + + def sync_stats(self): + sync_phase = 0 + if getattr(self._dbsyncer, 'sync_phase', None): + sync_phase = self._dbsyncer.sync_phase[0] + sync_exchange_phase = 0 + if getattr(self._dbsyncer, 'syncer', None): + if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None): + _p = self._dbsyncer.syncer.sync_exchange_phase[0] + sync_exchange_phase = _p + return sync_phase, sync_exchange_phase + + def _init_blobmanager(self): + path = os.path.join(os.path.dirname(self._local_db_path), 'blobs') + url = urlparse.urljoin(self.server_url, 'blobs/%s' % uuid) + key = self._secrets.local_key + self.blobmanager = blobs.BlobManager(path, url, key, self.uuid, + self.token, SOLEDAD_CERT) + + # + # Closing methods + # + + def close(self): + """ + Close underlying U1DB database. + """ + logger.debug("closing soledad") + self._dbpool.close() + self.blobmanager.close() + if getattr(self, '_dbsyncer', None): + self._dbsyncer.close() + + # + # ILocalStorage + # + + def _defer(self, meth, *args, **kw): + """ + Defer a method to be run on a U1DB connection pool. + + :param meth: A method to defer to the U1DB connection pool. + :type meth: callable + :return: A deferred. + :rtype: twisted.internet.defer.Deferred + """ + return self._dbpool.runU1DBQuery(meth, *args, **kw) + + def put_doc(self, doc): + """ + Update a document. + + If the document currently has conflicts, put will fail. + If the database specifies a maximum document size and the document + exceeds it, put will fail and raise a DocumentTooBig exception. + + ============================== WARNING ============================== + This method converts the document's contents to unicode in-place. This + means that after calling `put_doc(doc)`, the contents of the + document, i.e. `doc.content`, might be different from before the + call. + ============================== WARNING ============================== + + :param doc: A document with new content. + :type doc: leap.soledad.common.document.Document + :return: A deferred whose callback will be invoked with the new + revision identifier for the document. The document object will + also be updated. + :rtype: twisted.internet.defer.Deferred + """ + d = self._defer("put_doc", doc) + return d + + def delete_doc(self, doc): + """ + Mark a document as deleted. + + Will abort if the current revision doesn't match doc.rev. + This will also set doc.content to None. + + :param doc: A document to be deleted. + :type doc: leap.soledad.common.document.Document + :return: A deferred. + :rtype: twisted.internet.defer.Deferred + """ + soledad_assert(doc is not None, "delete_doc doesn't accept None.") + return self._defer("delete_doc", doc) + + def get_doc(self, doc_id, include_deleted=False): + """ + Get the JSON string for the given document. + + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise asking for a deleted + document will return None. + :type include_deleted: bool + :return: A deferred whose callback will be invoked with a document + object. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer( + "get_doc", doc_id, include_deleted=include_deleted) + + def get_docs( + self, doc_ids, check_for_conflicts=True, include_deleted=False): + """ + Get the JSON content for many documents. + + :param doc_ids: A list of document identifiers. + :type doc_ids: list + :param check_for_conflicts: If set to False, then the conflict check + will be skipped, and 'None' will be returned instead of True/False. + :type check_for_conflicts: bool + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted documents will not + be included in the results. + :type include_deleted: bool + :return: A deferred whose callback will be invoked with an iterable + giving the document object for each document id in matching + doc_ids order. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer( + "get_docs", doc_ids, check_for_conflicts=check_for_conflicts, + include_deleted=include_deleted) + + def get_all_docs(self, include_deleted=False): + """ + Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted documents will not + be included in the results. + :type include_deleted: bool + + :return: A deferred which, when fired, will pass the a tuple + containing (generation, [Document]) to the callback, with the + current generation of the database, followed by a list of all the + documents in the database. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("get_all_docs", include_deleted) + + @defer.inlineCallbacks + def create_doc(self, content, doc_id=None): + """ + Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param content: A Python dictionary. + :type content: dict + :param doc_id: An optional identifier specifying the document id. + :type doc_id: str + :return: A deferred whose callback will be invoked with a document. + :rtype: twisted.internet.defer.Deferred + """ + # TODO we probably should pass an optional "encoding" parameter to + # create_doc (and probably to put_doc too). There are cases (mail + # payloads for example) in which we already have the encoding in the + # headers, so we don't need to guess it. + doc = yield self._defer("create_doc", content, doc_id=doc_id) + doc.set_store(self) + defer.returnValue(doc) + + def create_doc_from_json(self, json, doc_id=None): + """ + Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param json: The JSON document string + :type json: dict + :param doc_id: An optional identifier specifying the document id. + :type doc_id: str + :return: A deferred whose callback will be invoked with a document. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("create_doc_from_json", json, doc_id=doc_id) + + def create_index(self, index_name, *index_expressions): + """ + Create a named index, which can then be queried for future lookups. + + Creating an index which already exists is not an error, and is cheap. + Creating an index which does not match the index_expressions of the + existing index is an error. + Creating an index will block until the expressions have been evaluated + and the index generated. + + :param index_name: A unique name which can be used as a key prefix + :type index_name: str + :param index_expressions: index expressions defining the index + information. + + Examples: + + "fieldname", or "fieldname.subfieldname" to index alphabetically + sorted on the contents of a field. + + "number(fieldname, width)", "lower(fieldname)" + :type index_expresions: list of str + :return: A deferred. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("create_index", index_name, *index_expressions) + + def delete_index(self, index_name): + """ + Remove a named index. + + :param index_name: The name of the index we are removing + :type index_name: str + :return: A deferred. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("delete_index", index_name) + + def list_indexes(self): + """ + List the definitions of all known indexes. + + :return: A deferred whose callback will be invoked with a list of + [('index-name', ['field', 'field2'])] definitions. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("list_indexes") + + def get_from_index(self, index_name, *key_values): + """ + Return documents that match the keys supplied. + + You must supply exactly the same number of values as have been defined + in the index. It is possible to do a prefix match by using '*' to + indicate a wildcard match. You can only supply '*' to trailing entries, + (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) + It is also possible to append a '*' to the last supplied value (eg + 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: list + :return: A deferred whose callback will be invoked with a list of + [Document]. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("get_from_index", index_name, *key_values) + + def get_count_from_index(self, index_name, *key_values): + """ + Return the count for a given combination of index_name + and key values. + + Extension method made from similar methods in u1db version 13.09 + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: A deferred whose callback will be invoked with the count. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("get_count_from_index", index_name, *key_values) + + def get_range_from_index(self, index_name, start_value, end_value): + """ + Return documents that fall within the specified range. + + Both ends of the range are inclusive. For both start_value and + end_value, one must supply exactly the same number of values as have + been defined in the index, or pass None. In case of a single column + index, a string is accepted as an alternative for a tuple with a single + value. It is possible to do a prefix match by using '*' to indicate + a wildcard match. You can only supply '*' to trailing entries, (eg + 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also + possible to append a '*' to the last supplied value (eg 'val*', '*', + '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param start_values: tuples of values that define the lower bound of + the range. eg, if you have an index with 3 fields then you would + have: (val1, val2, val3) + :type start_values: tuple + :param end_values: tuples of values that define the upper bound of the + range. eg, if you have an index with 3 fields then you would have: + (val1, val2, val3) + :type end_values: tuple + :return: A deferred whose callback will be invoked with a list of + [Document]. + :rtype: twisted.internet.defer.Deferred + """ + + return self._defer( + "get_range_from_index", index_name, start_value, end_value) + + def get_index_keys(self, index_name): + """ + Return all keys under which documents are indexed in this index. + + :param index_name: The index to query + :type index_name: str + :return: A deferred whose callback will be invoked with a list of + tuples of indexed keys. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("get_index_keys", index_name) + + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. + + The order of the conflicts is such that the first entry is the value + that would be returned by "get_doc". + + :param doc_id: The unique document identifier + :type doc_id: str + :return: A deferred whose callback will be invoked with a list of the + Document entries that are conflicted. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("get_doc_conflicts", doc_id) + + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + We take the list of revisions that the client knows about that it is + superseding. This may be a different list from the actual current + conflicts, in which case only those are removed as conflicted. This + may fail if the conflict list is significantly different from the + supplied information. (sync could have happened in the background from + the time you GET_DOC_CONFLICTS until the point where you RESOLVE) + + :param doc: A Document with the new content to be inserted. + :type doc: Document + :param conflicted_doc_revs: A list of revisions that the new content + supersedes. + :type conflicted_doc_revs: list(str) + :return: A deferred. + :rtype: twisted.internet.defer.Deferred + """ + return self._defer("resolve_doc", doc, conflicted_doc_revs) + + @property + def local_db_path(self): + return self._local_db_path + + @property + def userid(self): + return self.uuid + + # + # ISyncableStorage + # + + def sync(self): + """ + Synchronize documents with the server replica. + + This method uses a lock to prevent multiple concurrent sync processes + over the same local db file. + + :return: A deferred lock that will run the actual sync process when + the lock is acquired, and which will fire with with the local + generation before the synchronization was performed. + :rtype: twisted.internet.defer.Deferred + """ + # maybe bypass sync + # TODO: That's because bitmask may not provide us a token, but + # this should be handled on the caller side. Here, calling us without + # a token is a real error. + if not self.token: + generation = self._dbsyncer.get_generation() + return defer.succeed(generation) + + d = self.sync_lock.run( + self._sync) + return d + + def _sync(self): + """ + Synchronize documents with the server replica. + + :return: A deferred whose callback will be invoked with the local + generation before the synchronization was performed. + :rtype: twisted.internet.defer.Deferred + """ + sync_url = urlparse.urljoin(self.server_url, 'user-%s' % self.uuid) + if not self._dbsyncer: + return + creds = {'token': {'uuid': self.uuid, 'token': self.token}} + d = self._dbsyncer.sync(sync_url, creds=creds) + + def _sync_callback(local_gen): + self._last_received_docs = docs = self._dbsyncer.received_docs + + # Post-Sync Hooks + if docs: + iface = soledad_interfaces.ISoledadPostSyncPlugin + suitable_plugins = collect_plugins(iface) + for plugin in suitable_plugins: + watched = plugin.watched_doc_types + r = [filter( + lambda s: s.startswith(preffix), + docs) for preffix in watched] + filtered = list(chain(*r)) + plugin.process_received_docs(filtered) + + return local_gen + + def _sync_errback(failure): + s = StringIO() + failure.printDetailedTraceback(file=s) + msg = "got exception when syncing!\n" + s.getvalue() + logger.error(msg) + return failure + + def _emit_done_data_sync(passthrough): + user_data = {'uuid': self.uuid, 'userid': self.userid} + soledad_events.emit_async( + soledad_events.SOLEDAD_DONE_DATA_SYNC, user_data) + return passthrough + + d.addCallbacks(_sync_callback, _sync_errback) + d.addCallback(_emit_done_data_sync) + return d + + @property + def sync_lock(self): + """ + Class based lock to prevent concurrent syncs using the same local db + file. + + :return: A shared lock based on this instance's db file path. + :rtype: DeferredLock + """ + return self._sync_lock[self._local_db_path] + + @property + def syncing(self): + """ + Return wether Soledad is currently synchronizing with the server. + + :return: Wether Soledad is currently synchronizing with the server. + :rtype: bool + """ + return self.sync_lock.locked + + # + # ISecretsStorage + # + + @property + def secrets(self): + """ + Return the secrets object. + + :return: The secrets object. + :rtype: Secrets + """ + return self._secrets + + def change_passphrase(self, new_passphrase): + """ + Change the passphrase that encrypts the storage secret. + + :param new_passphrase: The new passphrase. + :type new_passphrase: unicode + + :raise NoStorageSecret: Raised if there's no storage secret available. + """ + self.passphrase = new_passphrase + self._secrets.store_secrets() + + # + # Raw SQLCIPHER Queries + # + + def raw_sqlcipher_query(self, *args, **kw): + """ + Run a raw sqlcipher query in the local database, and return a deferred + that will be fired with the result. + """ + return self._dbpool.runQuery(*args, **kw) + + def raw_sqlcipher_operation(self, *args, **kw): + """ + Run a raw sqlcipher operation in the local database, and return a + deferred that will be fired with None. + """ + return self._dbpool.runOperation(*args, **kw) + + # + # Service authentication + # + + @defer.inlineCallbacks + def get_or_create_service_token(self, service): + """ + Return the stored token for a given service, or generates and stores a + random one if it does not exist. + + These tokens can be used to authenticate services. + """ + # FIXME this could use the local sqlcipher database, to avoid + # problems with different replicas creating different tokens. + + yield self.create_index('by-servicetoken', 'type', 'service') + docs = yield self._get_token_for_service(service) + if docs: + doc = docs[0] + defer.returnValue(doc.content['token']) + else: + token = str(uuid.uuid4()).replace('-', '')[-24:] + yield self._set_token_for_service(service, token) + defer.returnValue(token) + + def _get_token_for_service(self, service): + return self.get_from_index('by-servicetoken', 'servicetoken', service) + + def _set_token_for_service(self, service, token): + doc = {'type': 'servicetoken', 'service': service, 'token': token} + return self.create_doc(doc) + + def create_recovery_code(self): + return self._recovery_code.generate() + + +def create_path_if_not_exists(path): + try: + if not os.path.isdir(path): + logger.info('creating directory: %s.' % path) + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + +# ---------------------------------------------------------------------------- +# Monkey patching u1db to be able to provide a custom SSL cert +# ---------------------------------------------------------------------------- + + +# We need a more reasonable timeout (in seconds) +SOLEDAD_TIMEOUT = 120 + + +class VerifiedHTTPSConnection(httplib.HTTPSConnection): + """ + HTTPSConnection verifying server side certificates. + """ + # derived from httplib.py + + def connect(self): + """ + Connect to a host on a given (SSL) port. + """ + try: + source = self.source_address + sock = socket.create_connection((self.host, self.port), + SOLEDAD_TIMEOUT, source) + except AttributeError: + # source_address was introduced in 2.7 + sock = socket.create_connection((self.host, self.port), + SOLEDAD_TIMEOUT) + if self._tunnel_host: + self.sock = sock + self._tunnel() + + self.sock = ssl.wrap_socket(sock, + ca_certs=SOLEDAD_CERT, + cert_reqs=ssl.CERT_REQUIRED) + match_hostname(self.sock.getpeercert(), self.host) + + +old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection +http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection diff --git a/src/leap/soledad/client/auth.py b/src/leap/soledad/client/auth.py new file mode 100644 index 00000000..78e9bf1b --- /dev/null +++ b/src/leap/soledad/client/auth.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# auth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Methods for token-based authentication. + +These methods have to be included in all classes that extend HTTPClient so +they can do token-based auth requests to the Soledad server. +""" +import base64 + +from leap.soledad.common.l2db import errors + + +class TokenBasedAuth(object): + """ + Encapsulate token-auth methods for classes that inherit from + u1db.remote.http_client.HTTPClient. + """ + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + self._creds = {'token': (uuid, token)} + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request, in + the form: + + [('Authorization', 'Token <(base64 encoded) uuid:token>')] + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + if 'token' in self._creds: + uuid, token = self._creds['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + return [('Authorization', 'Token %s' % b64_token)] + else: + raise errors.UnknownAuthMethod( + 'Wrong credentials: %s' % self._creds) diff --git a/src/leap/soledad/client/crypto.py b/src/leap/soledad/client/crypto.py new file mode 100644 index 00000000..0f19c964 --- /dev/null +++ b/src/leap/soledad/client/crypto.py @@ -0,0 +1,448 @@ +# -*- coding: utf-8 -*- +# crypto.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Cryptographic utilities for Soledad. +""" +import os +import binascii +import hmac +import hashlib +import json + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from leap.soledad.common import soledad_assert +from leap.soledad.common import soledad_assert_type +from leap.soledad.common import crypto +from leap.soledad.common.log import getLogger +import warnings + + +logger = getLogger(__name__) +warnings.warn("'soledad.client.crypto' MODULE DEPRECATED", + DeprecationWarning, stacklevel=2) + + +MAC_KEY_LENGTH = 64 + +crypto_backend = default_backend() + + +def encrypt_sym(data, key): + """ + Encrypt data using AES-256 cipher in CTR mode. + + :param data: The data to be encrypted. + :type data: str + :param key: The key used to encrypt data (must be 256 bits long). + :type key: str + + :return: A tuple with the initialization vector and the encrypted data. + :rtype: (long, str) + """ + soledad_assert_type(key, str) + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s bits (must be 256 bits long).' % + (len(key) * 8)) + + iv = os.urandom(16) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(data) + encryptor.finalize() + + return binascii.b2a_base64(iv), ciphertext + + +def decrypt_sym(data, key, iv): + """ + Decrypt some data previously encrypted using AES-256 cipher in CTR mode. + + :param data: The data to be decrypted. + :type data: str + :param key: The symmetric key used to decrypt data (must be 256 bits + long). + :type key: str + :param iv: The initialization vector. + :type iv: long + + :return: The decrypted data. + :rtype: str + """ + soledad_assert_type(key, str) + # assert params + soledad_assert( + len(key) == 32, # 32 x 8 = 256 bits. + 'Wrong key size: %s (must be 256 bits long).' % len(key)) + iv = binascii.a2b_base64(iv) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) + decryptor = cipher.decryptor() + return decryptor.update(data) + decryptor.finalize() + + +def doc_mac_key(doc_id, secret): + """ + Generate a key for calculating a MAC for a document whose id is + C{doc_id}. + + The key is derived using HMAC having sha256 as underlying hash + function. The key used for HMAC is the first MAC_KEY_LENGTH characters + of Soledad's storage secret. The HMAC message is C{doc_id}. + + :param doc_id: The id of the document. + :type doc_id: str + + :param secret: The Soledad storage secret + :type secret: str + + :return: The key. + :rtype: str + """ + soledad_assert(secret is not None) + return hmac.new( + secret[:MAC_KEY_LENGTH], + doc_id, + hashlib.sha256).digest() + + +class SoledadCrypto(object): + """ + General cryptographic functionality encapsulated in a + object that can be passed along. + """ + def __init__(self, secret): + """ + Initialize the crypto object. + + :param secret: The Soledad remote storage secret. + :type secret: str + """ + self._secret = secret + + def doc_mac_key(self, doc_id): + return doc_mac_key(doc_id, self._secret) + + def doc_passphrase(self, doc_id): + """ + Generate a passphrase for symmetric encryption of document's contents. + + The password is derived using HMAC having sha256 as underlying hash + function. The key used for HMAC are the first + C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage + secret stripped from the first MAC_KEY_LENGTH characters. The HMAC + message is C{doc_id}. + + :param doc_id: The id of the document that will be encrypted using + this passphrase. + :type doc_id: str + + :return: The passphrase. + :rtype: str + """ + soledad_assert(self._secret is not None) + return hmac.new( + self._secret[MAC_KEY_LENGTH:], + doc_id, + hashlib.sha256).digest() + + def encrypt_doc(self, doc): + """ + Wrapper around encrypt_docstr that accepts the document as argument. + + :param doc: the document. + :type doc: Document + """ + key = self.doc_passphrase(doc.doc_id) + + return encrypt_docstr( + doc.get_json(), doc.doc_id, doc.rev, key, self._secret) + + def decrypt_doc(self, doc): + """ + Wrapper around decrypt_doc_dict that accepts the document as argument. + + :param doc: the document. + :type doc: Document + + :return: json string with the decrypted document + :rtype: str + """ + key = self.doc_passphrase(doc.doc_id) + return decrypt_doc_dict( + doc.content, doc.doc_id, doc.rev, key, self._secret) + + @property + def secret(self): + return self._secret + + +# +# Crypto utilities for a Document. +# + +def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, + mac_method, secret): + """ + Calculate a MAC for C{doc} using C{ciphertext}. + + Current MAC method used is HMAC, with the following parameters: + + * key: sha256(storage_secret, doc_id) + * msg: doc_id + doc_rev + ciphertext + * digestmod: sha256 + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param enc_scheme: The encryption scheme. + :type enc_scheme: str + :param enc_method: The encryption method. + :type enc_method: str + :param enc_iv: The encryption initialization vector. + :type enc_iv: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: The Soledad storage secret + :type secret: str + + :return: The calculated MAC. + :rtype: str + + :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown. + """ + try: + soledad_assert(mac_method == crypto.MacMethods.HMAC) + except AssertionError: + raise crypto.UnknownMacMethodError + template = "{doc_id}{doc_rev}{ciphertext}{enc_scheme}{enc_method}{enc_iv}" + content = template.format( + doc_id=doc_id, + doc_rev=doc_rev, + ciphertext=ciphertext, + enc_scheme=enc_scheme, + enc_method=enc_method, + enc_iv=enc_iv) + return hmac.new( + doc_mac_key(doc_id, secret), + content, + hashlib.sha256).digest() + + +def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): + """ + Encrypt C{doc}'s content. + + Encrypt doc's contents using AES-256 CTR mode and return a valid JSON + string representing the following: + + { + crypto.ENC_JSON_KEY: '<encrypted doc JSON string>', + crypto.ENC_SCHEME_KEY: 'symkey', + crypto.ENC_METHOD_KEY: crypto.EncryptionMethods.AES_256_CTR, + crypto.ENC_IV_KEY: '<the initial value used to encrypt>', + MAC_KEY: '<mac>' + crypto.MAC_METHOD_KEY: 'hmac' + } + + :param docstr: A representation of the document to be encrypted. + :type docstr: str or unicode. + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + + :return: The JSON serialization of the dict representing the encrypted + content. + :rtype: str + """ + enc_scheme = crypto.EncryptionSchemes.SYMKEY + enc_method = crypto.EncryptionMethods.AES_256_CTR + mac_method = crypto.MacMethods.HMAC + enc_iv, ciphertext = encrypt_sym( + str(docstr), # encryption/decryption routines expect str + key) + mac = binascii.b2a_hex( # store the mac as hex. + mac_doc( + doc_id, + doc_rev, + ciphertext, + enc_scheme, + enc_method, + enc_iv, + mac_method, + secret)) + # Return a representation for the encrypted content. In the following, we + # convert binary data to hexadecimal representation so the JSON + # serialization does not complain about what it tries to serialize. + hex_ciphertext = binascii.b2a_hex(ciphertext) + logger.debug("encrypting doc: %s" % doc_id) + return json.dumps({ + crypto.ENC_JSON_KEY: hex_ciphertext, + crypto.ENC_SCHEME_KEY: enc_scheme, + crypto.ENC_METHOD_KEY: enc_method, + crypto.ENC_IV_KEY: enc_iv, + crypto.MAC_KEY: mac, + crypto.MAC_METHOD_KEY: mac_method, + }) + + +def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, + enc_iv, mac_method, secret, doc_mac): + """ + Verify that C{doc_mac} is a correct MAC for the given document. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + :param ciphertext: The content of the document. + :type ciphertext: str + :param enc_scheme: The encryption scheme. + :type enc_scheme: str + :param enc_method: The encryption method. + :type enc_method: str + :param enc_iv: The encryption initialization vector. + :type enc_iv: str + :param mac_method: The MAC method to use. + :type mac_method: str + :param secret: The Soledad storage secret + :type secret: str + :param doc_mac: The MAC to be verified against. + :type doc_mac: str + + :raise crypto.UnknownMacMethodError: Raised when C{mac_method} is unknown. + :raise crypto.WrongMacError: Raised when MAC could not be verified. + """ + calculated_mac = mac_doc( + doc_id, + doc_rev, + ciphertext, + enc_scheme, + enc_method, + enc_iv, + mac_method, + secret) + # we compare mac's hashes to avoid possible timing attacks that might + # exploit python's builtin comparison operator behaviour, which fails + # immediatelly when non-matching bytes are found. + doc_mac_hash = hashlib.sha256( + binascii.a2b_hex( # the mac is stored as hex + doc_mac)).digest() + calculated_mac_hash = hashlib.sha256(calculated_mac).digest() + + if doc_mac_hash != calculated_mac_hash: + logger.warn("wrong MAC while decrypting doc...") + raise crypto.WrongMacError("Could not authenticate document's " + "contents.") + + +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): + """ + Decrypt a symmetrically encrypted C{doc}'s content. + + Return the JSON string representation of the document's decrypted content. + + The passed doc_dict argument should have the following structure: + + { + crypto.ENC_JSON_KEY: '<enc_blob>', + crypto.ENC_SCHEME_KEY: '<enc_scheme>', + crypto.ENC_METHOD_KEY: '<enc_method>', + crypto.ENC_IV_KEY: '<initial value used to encrypt>', # (optional) + MAC_KEY: '<mac>' + crypto.MAC_METHOD_KEY: 'hmac' + } + + C{enc_blob} is the encryption of the JSON serialization of the document's + content. For now Soledad just deals with documents whose C{enc_scheme} is + crypto.EncryptionSchemes.SYMKEY and C{enc_method} is + crypto.EncryptionMethods.AES_256_CTR. + + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: The Soledad storage secret. + :type secret: str + + :return: The JSON serialization of the decrypted content. + :rtype: str + + :raise UnknownEncryptionMethodError: Raised when trying to decrypt from an + unknown encryption method. + """ + # assert document dictionary structure + expected_keys = set([ + crypto.ENC_JSON_KEY, + crypto.ENC_SCHEME_KEY, + crypto.ENC_METHOD_KEY, + crypto.ENC_IV_KEY, + crypto.MAC_KEY, + crypto.MAC_METHOD_KEY, + ]) + soledad_assert(expected_keys.issubset(set(doc_dict.keys()))) + + ciphertext = binascii.a2b_hex(doc_dict[crypto.ENC_JSON_KEY]) + enc_scheme = doc_dict[crypto.ENC_SCHEME_KEY] + enc_method = doc_dict[crypto.ENC_METHOD_KEY] + enc_iv = doc_dict[crypto.ENC_IV_KEY] + doc_mac = doc_dict[crypto.MAC_KEY] + mac_method = doc_dict[crypto.MAC_METHOD_KEY] + + soledad_assert(enc_scheme == crypto.EncryptionSchemes.SYMKEY) + + _verify_doc_mac( + doc_id, doc_rev, ciphertext, enc_scheme, enc_method, + enc_iv, mac_method, secret, doc_mac) + + return decrypt_sym(ciphertext, key, enc_iv) + + +def is_symmetrically_encrypted(doc): + """ + Return True if the document was symmetrically encrypted. + + :param doc: The document to check. + :type doc: Document + + :rtype: bool + """ + if doc.content and crypto.ENC_SCHEME_KEY in doc.content: + if doc.content[crypto.ENC_SCHEME_KEY] \ + == crypto.EncryptionSchemes.SYMKEY: + return True + return False diff --git a/src/leap/soledad/client/events.py b/src/leap/soledad/client/events.py new file mode 100644 index 00000000..058be59c --- /dev/null +++ b/src/leap/soledad/client/events.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# signal.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Signaling functions. +""" + +from leap.common.events import emit_async +from leap.common.events import catalog + + +SOLEDAD_CREATING_KEYS = catalog.SOLEDAD_CREATING_KEYS +SOLEDAD_DONE_CREATING_KEYS = catalog.SOLEDAD_DONE_CREATING_KEYS +SOLEDAD_DOWNLOADING_KEYS = catalog.SOLEDAD_DOWNLOADING_KEYS +SOLEDAD_DONE_DOWNLOADING_KEYS = \ + catalog.SOLEDAD_DONE_DOWNLOADING_KEYS +SOLEDAD_UPLOADING_KEYS = catalog.SOLEDAD_UPLOADING_KEYS +SOLEDAD_DONE_UPLOADING_KEYS = \ + catalog.SOLEDAD_DONE_UPLOADING_KEYS +SOLEDAD_NEW_DATA_TO_SYNC = catalog.SOLEDAD_NEW_DATA_TO_SYNC +SOLEDAD_DONE_DATA_SYNC = catalog.SOLEDAD_DONE_DATA_SYNC +SOLEDAD_SYNC_SEND_STATUS = catalog.SOLEDAD_SYNC_SEND_STATUS +SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS + + +__all__ = [ + "catalog", + "emit_async", + "SOLEDAD_CREATING_KEYS", + "SOLEDAD_DONE_CREATING_KEYS", + "SOLEDAD_DOWNLOADING_KEYS", + "SOLEDAD_DONE_DOWNLOADING_KEYS", + "SOLEDAD_UPLOADING_KEYS", + "SOLEDAD_DONE_UPLOADING_KEYS", + "SOLEDAD_NEW_DATA_TO_SYNC", + "SOLEDAD_DONE_DATA_SYNC", + "SOLEDAD_SYNC_SEND_STATUS", + "SOLEDAD_SYNC_RECEIVE_STATUS", +] diff --git a/src/leap/soledad/client/examples/README b/src/leap/soledad/client/examples/README new file mode 100644 index 00000000..3aed8377 --- /dev/null +++ b/src/leap/soledad/client/examples/README @@ -0,0 +1,4 @@ +Right now, you can find here both an example of use +and the benchmarking scripts. +TODO move benchmark scripts to root scripts/ folder, +and leave here only a minimal example. diff --git a/src/leap/soledad/client/examples/benchmarks/.gitignore b/src/leap/soledad/client/examples/benchmarks/.gitignore new file mode 100644 index 00000000..2211df63 --- /dev/null +++ b/src/leap/soledad/client/examples/benchmarks/.gitignore @@ -0,0 +1 @@ +*.txt diff --git a/src/leap/soledad/client/examples/benchmarks/get_sample.sh b/src/leap/soledad/client/examples/benchmarks/get_sample.sh new file mode 100755 index 00000000..1995eee1 --- /dev/null +++ b/src/leap/soledad/client/examples/benchmarks/get_sample.sh @@ -0,0 +1,3 @@ +#!/bin/sh +mkdir tmp +wget http://www.gutenberg.org/cache/epub/101/pg101.txt -O hacker_crackdown.txt diff --git a/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/src/leap/soledad/client/examples/benchmarks/measure_index_times.py new file mode 100644 index 00000000..f9349758 --- /dev/null +++ b/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# measure_index_times.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Measure u1db retrieval times for different u1db index situations. +""" +from __future__ import print_function +from functools import partial +import datetime +import hashlib +import os +import sys + +from twisted.internet import defer, reactor + +from leap.soledad.common import l2db +from leap.soledad.client import adbapi +from leap.soledad.client._db.sqlcipher import SQLCipherOptions + + +folder = os.environ.get("TMPDIR", "tmp") +numdocs = int(os.environ.get("DOCS", "1000")) +silent = os.environ.get("SILENT", False) +tmpdb = os.path.join(folder, "test.soledad") + + +sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt") +sample_path = os.path.join(os.curdir, sample_file) + +try: + with open(sample_file) as f: + SAMPLE = f.readlines() +except Exception: + print("[!] Problem opening sample file. Did you download " + "the sample, or correctly set 'SAMPLE' env var?") + sys.exit(1) + +if numdocs > len(SAMPLE): + print("[!] Sorry! The requested DOCS number is larger than " + "the num of lines in our sample file") + sys.exit(1) + + +def debug(*args): + if not silent: + print(*args) + + +debug("[+] db path:", tmpdb) +debug("[+] num docs", numdocs) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +dbpool = adbapi.getConnectionPool(opts) + + +def createDoc(doc): + return dbpool.runU1DBQuery("create_doc", doc) + + +db_indexes = { + 'by-chash': ['chash'], + 'by-number': ['number']} + + +def create_indexes(_): + deferreds = [] + for index, definition in db_indexes.items(): + d = dbpool.runU1DBQuery("create_index", index, *definition) + deferreds.append(d) + return defer.gatherResults(deferreds) + + +class TimeWitness(object): + def __init__(self, init_time): + self.init_time = init_time + + def get_time_count(self): + return datetime.datetime.now() - self.init_time + + +def get_from_index(_): + init_time = datetime.datetime.now() + debug("GETTING FROM INDEX...", init_time) + + def printValue(res, time): + print("RESULT->", res) + print("Index Query Took: ", time.get_time_count()) + return res + + d = dbpool.runU1DBQuery( + "get_from_index", "by-chash", + # "1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5") + # XXX this is line 89 from the hacker crackdown... + # Should accept any other optional hash as an enviroment variable. + "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469") + d.addCallback(printValue, TimeWitness(init_time)) + return d + + +def getAllDocs(): + return dbpool.runU1DBQuery("get_all_docs") + + +def errBack(e): + debug("[!] ERROR FOUND!!!") + e.printTraceback() + reactor.stop() + + +def countDocs(_): + debug("counting docs...") + d = getAllDocs() + d.addCallbacks(printResult, errBack) + d.addCallbacks(allDone, errBack) + return d + + +def printResult(r, **kwargs): + if kwargs: + debug(*kwargs.values()) + elif isinstance(r, l2db.Document): + debug(r.doc_id, r.content['number']) + else: + len_results = len(r[1]) + debug("GOT %s results" % len(r[1])) + + if len_results == numdocs: + debug("ALL GOOD") + else: + debug("[!] MISSING DOCS!!!!!") + raise ValueError("We didn't expect this result len") + + +def allDone(_): + debug("ALL DONE!") + + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + reactor.stop() + + +def insert_docs(_): + deferreds = [] + for i in range(numdocs): + payload = SAMPLE[i] + chash = hashlib.sha256(payload).hexdigest() + doc = {"number": i, "payload": payload, 'chash': chash} + d = createDoc(doc) + d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload), + lambda e: e.printTraceback()) + deferreds.append(d) + return defer.gatherResults(deferreds, consumeErrors=True) + + +d = create_indexes(None) +d.addCallback(insert_docs) +d.addCallback(get_from_index) +d.addCallback(countDocs) + +reactor.run() diff --git a/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py new file mode 100644 index 00000000..4f273c64 --- /dev/null +++ b/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# measure_index_times.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Measure u1db retrieval times for different u1db index situations. +""" +from __future__ import print_function +from functools import partial +import datetime +import hashlib +import os +import sys + +from twisted.internet import defer, reactor + +from leap.soledad.client import adbapi +from leap.soledad.client._db.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db + + +folder = os.environ.get("TMPDIR", "tmp") +numdocs = int(os.environ.get("DOCS", "1000")) +silent = os.environ.get("SILENT", False) +tmpdb = os.path.join(folder, "test.soledad") + + +sample_file = os.environ.get("SAMPLE", "hacker_crackdown.txt") +sample_path = os.path.join(os.curdir, sample_file) + +try: + with open(sample_file) as f: + SAMPLE = f.readlines() +except Exception: + print("[!] Problem opening sample file. Did you download " + "the sample, or correctly set 'SAMPLE' env var?") + sys.exit(1) + +if numdocs > len(SAMPLE): + print("[!] Sorry! The requested DOCS number is larger than " + "the num of lines in our sample file") + sys.exit(1) + + +def debug(*args): + if not silent: + print(*args) + + +debug("[+] db path:", tmpdb) +debug("[+] num docs", numdocs) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +dbpool = adbapi.getConnectionPool(opts) + + +def createDoc(doc, doc_id): + return dbpool.runU1DBQuery("create_doc", doc, doc_id=doc_id) + + +db_indexes = { + 'by-chash': ['chash'], + 'by-number': ['number']} + + +def create_indexes(_): + deferreds = [] + for index, definition in db_indexes.items(): + d = dbpool.runU1DBQuery("create_index", index, *definition) + deferreds.append(d) + return defer.gatherResults(deferreds) + + +class TimeWitness(object): + def __init__(self, init_time): + self.init_time = init_time + + def get_time_count(self): + return datetime.datetime.now() - self.init_time + + +def get_from_index(_): + init_time = datetime.datetime.now() + debug("GETTING FROM INDEX...", init_time) + + def printValue(res, time): + print("RESULT->", res) + print("Index Query Took: ", time.get_time_count()) + return res + + d = dbpool.runU1DBQuery( + "get_doc", + # "1150c7f10fabce0a57ce13071349fc5064f15bdb0cc1bf2852f74ef3f103aff5") + # XXX this is line 89 from the hacker crackdown... + # Should accept any other optional hash as an enviroment variable. + "57793320d4997a673fc7062652da0596c36a4e9fbe31310d2281e67d56d82469") + d.addCallback(printValue, TimeWitness(init_time)) + return d + + +def getAllDocs(): + return dbpool.runU1DBQuery("get_all_docs") + + +def errBack(e): + debug("[!] ERROR FOUND!!!") + e.printTraceback() + reactor.stop() + + +def countDocs(_): + debug("counting docs...") + d = getAllDocs() + d.addCallbacks(printResult, errBack) + d.addCallbacks(allDone, errBack) + return d + + +def printResult(r, **kwargs): + if kwargs: + debug(*kwargs.values()) + elif isinstance(r, l2db.Document): + debug(r.doc_id, r.content['number']) + else: + len_results = len(r[1]) + debug("GOT %s results" % len(r[1])) + + if len_results == numdocs: + debug("ALL GOOD") + else: + debug("[!] MISSING DOCS!!!!!") + raise ValueError("We didn't expect this result len") + + +def allDone(_): + debug("ALL DONE!") + + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + reactor.stop() + + +def insert_docs(_): + deferreds = [] + for i in range(numdocs): + payload = SAMPLE[i] + chash = hashlib.sha256(payload).hexdigest() + doc = {"number": i, "payload": payload, 'chash': chash} + d = createDoc(doc, doc_id=chash) + d.addCallbacks(partial(printResult, i=i, chash=chash, payload=payload), + lambda e: e.printTraceback()) + deferreds.append(d) + return defer.gatherResults(deferreds, consumeErrors=True) + + +d = create_indexes(None) +d.addCallback(insert_docs) +d.addCallback(get_from_index) +d.addCallback(countDocs) + +reactor.run() diff --git a/src/leap/soledad/client/examples/compare.txt b/src/leap/soledad/client/examples/compare.txt new file mode 100644 index 00000000..19a1325a --- /dev/null +++ b/src/leap/soledad/client/examples/compare.txt @@ -0,0 +1,8 @@ +TIMES=100 TMPDIR=/media/sdb5/leap python use_adbapi.py 1.34s user 0.16s system 53% cpu 2.832 total +TIMES=100 TMPDIR=/media/sdb5/leap python use_api.py 1.22s user 0.14s system 62% cpu 2.181 total + +TIMES=1000 TMPDIR=/media/sdb5/leap python use_api.py 2.18s user 0.34s system 27% cpu 9.213 total +TIMES=1000 TMPDIR=/media/sdb5/leap python use_adbapi.py 2.40s user 0.34s system 39% cpu 7.004 total + +TIMES=5000 TMPDIR=/media/sdb5/leap python use_api.py 6.63s user 1.27s system 13% cpu 57.882 total +TIMES=5000 TMPDIR=/media/sdb5/leap python use_adbapi.py 6.84s user 1.26s system 36% cpu 22.367 total diff --git a/src/leap/soledad/client/examples/manifest.phk b/src/leap/soledad/client/examples/manifest.phk new file mode 100644 index 00000000..2c86c07d --- /dev/null +++ b/src/leap/soledad/client/examples/manifest.phk @@ -0,0 +1,50 @@ +The Hacker's Manifesto + +The Hacker's Manifesto +by: The Mentor + +Another one got caught today, it's all over the papers. "Teenager +Arrested in Computer Crime Scandal", "Hacker Arrested after Bank +Tampering." "Damn kids. They're all alike." But did you, in your +three-piece psychology and 1950's technobrain, ever take a look behind +the eyes of the hacker? Did you ever wonder what made him tick, what +forces shaped him, what may have molded him? I am a hacker, enter my +world. Mine is a world that begins with school. I'm smarter than most of +the other kids, this crap they teach us bores me. "Damn underachiever. +They're all alike." I'm in junior high or high school. I've listened to +teachers explain for the fifteenth time how to reduce a fraction. I +understand it. "No, Ms. Smith, I didn't show my work. I did it in +my head." "Damn kid. Probably copied it. They're all alike." I made a +discovery today. I found a computer. Wait a second, this is cool. It does +what I want it to. If it makes a mistake, it's because I screwed it up. +Not because it doesn't like me, or feels threatened by me, or thinks I'm +a smart ass, or doesn't like teaching and shouldn't be here. Damn kid. +All he does is play games. They're all alike. And then it happened... a +door opened to a world... rushing through the phone line like heroin +through an addict's veins, an electronic pulse is sent out, a refuge from +the day-to-day incompetencies is sought... a board is found. "This is +it... this is where I belong..." I know everyone here... even if I've +never met them, never talked to them, may never hear from them again... I +know you all... Damn kid. Tying up the phone line again. They're all +alike... You bet your ass we're all alike... we've been spoon-fed baby +food at school when we hungered for steak... the bits of meat that you +did let slip through were pre-chewed and tasteless. We've been dominated +by sadists, or ignored by the apathetic. The few that had something to +teach found us willing pupils, but those few are like drops of water in +the desert. This is our world now... the world of the electron and the +switch, the beauty of the baud. We make use of a service already existing +without paying for what could be dirt-cheap if it wasn't run by +profiteering gluttons, and you call us criminals. We explore... and you +call us criminals. We seek after knowledge... and you call us criminals. +We exist without skin color, without nationality, without religious +bias... and you call us criminals. You build atomic bombs, you wage wars, +you murder, cheat, and lie to us and try to make us believe it's for our +own good, yet we're the criminals. Yes, I am a criminal. My crime is that +of curiosity. My crime is that of judging people by what they say and +think, not what they look like. My crime is that of outsmarting you, +something that you will never forgive me for. I am a hacker, and this is +my manifesto. You may stop this individual, but you can't stop us all... +after all, we're all alike. + +This was the last published file written by The Mentor. Shortly after +releasing it, he was busted by the FBI. The Mentor, sadly missed. diff --git a/src/leap/soledad/client/examples/plot-async-db.py b/src/leap/soledad/client/examples/plot-async-db.py new file mode 100644 index 00000000..018a1a1d --- /dev/null +++ b/src/leap/soledad/client/examples/plot-async-db.py @@ -0,0 +1,45 @@ +import csv +from matplotlib import pyplot as plt + +FILE = "bench.csv" + +# config the plot +plt.xlabel('number of inserts') +plt.ylabel('time (seconds)') +plt.title('SQLCipher parallelization') + +kwargs = { + 'linewidth': 1.0, + 'linestyle': '-', +} + +series = (('sync', 'r'), + ('async', 'g')) + +data = {'mark': [], + 'sync': [], + 'async': []} + +with open(FILE, 'rb') as csvfile: + series_reader = csv.reader(csvfile, delimiter=',') + for m, s, a in series_reader: + data['mark'].append(int(m)) + data['sync'].append(float(s)) + data['async'].append(float(a)) + +xmax = max(data['mark']) +xmin = min(data['mark']) +ymax = max(data['sync'] + data['async']) +ymin = min(data['sync'] + data['async']) + +for run in series: + name = run[0] + color = run[1] + plt.plot(data['mark'], data[name], label=name, color=color, **kwargs) + +plt.axes().annotate("", xy=(xmax, ymax)) +plt.axes().annotate("", xy=(xmin, ymin)) + +plt.grid() +plt.legend() +plt.show() diff --git a/src/leap/soledad/client/examples/run_benchmark.py b/src/leap/soledad/client/examples/run_benchmark.py new file mode 100644 index 00000000..ddedf433 --- /dev/null +++ b/src/leap/soledad/client/examples/run_benchmark.py @@ -0,0 +1,30 @@ +""" +Run a mini-benchmark between regular api and dbapi +""" +import commands +import os +import time + +TMPDIR = os.environ.get("TMPDIR", "/tmp") +CSVFILE = 'bench.csv' + +cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py" + + +def parse_time(r): + return r.split('\n')[-1] + + +with open(CSVFILE, 'w') as log: + + for times in range(0, 10000, 500): + cmd1 = cmd.format(times=times, tmpdir=TMPDIR, version="") + sync_time = parse_time(commands.getoutput(cmd1)) + + cmd2 = cmd.format(times=times, tmpdir=TMPDIR, version="adb") + async_time = parse_time(commands.getoutput(cmd2)) + + print times, sync_time, async_time + log.write("%s, %s, %s\n" % (times, sync_time, async_time)) + log.flush() + time.sleep(2) diff --git a/src/leap/soledad/client/examples/soledad_sync.py b/src/leap/soledad/client/examples/soledad_sync.py new file mode 100644 index 00000000..3aed10eb --- /dev/null +++ b/src/leap/soledad/client/examples/soledad_sync.py @@ -0,0 +1,63 @@ +from leap.bitmask.config.providerconfig import ProviderConfig +from leap.bitmask.crypto.srpauth import SRPAuth +from leap.soledad.client import Soledad +from twisted.internet import reactor +import logging +logging.basicConfig(level=logging.DEBUG) + + +# EDIT THIS -------------------------------------------- +user = u"USERNAME" +uuid = u"USERUUID" +_pass = u"USERPASS" +server_url = "https://soledad.server.example.org:2323" +# EDIT THIS -------------------------------------------- + +secrets_path = "/tmp/%s.secrets" % uuid +local_db_path = "/tmp/%s.soledad" % uuid +cert_file = "/tmp/cacert.pem" +provider_config = '/tmp/cdev.json' + + +provider = ProviderConfig() +provider.load(provider_config) + +soledad = None + + +def printStuff(r): + print r + + +def printErr(err): + logging.exception(err.value) + + +def init_soledad(_): + token = srpauth.get_token() + print "token", token + + global soledad + soledad = Soledad(uuid, _pass, secrets_path, local_db_path, + server_url, cert_file, + auth_token=token) + + def getall(_): + d = soledad.get_all_docs() + return d + + d1 = soledad.create_doc({"test": 42}) + d1.addCallback(getall) + d1.addCallbacks(printStuff, printErr) + + d2 = soledad.sync() + d2.addCallbacks(printStuff, printErr) + d2.addBoth(lambda r: reactor.stop()) + + +srpauth = SRPAuth(provider) + +d = srpauth.authenticate(user, _pass) +d.addCallbacks(init_soledad, printErr) + +reactor.run() diff --git a/src/leap/soledad/client/examples/use_adbapi.py b/src/leap/soledad/client/examples/use_adbapi.py new file mode 100644 index 00000000..ddb1eaae --- /dev/null +++ b/src/leap/soledad/client/examples/use_adbapi.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# use_adbapi.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Example of use of the asynchronous soledad api. +""" +from __future__ import print_function +import datetime +import os + +from twisted.internet import defer, reactor + +from leap.soledad.client import adbapi +from leap.soledad.client._db.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db + + +folder = os.environ.get("TMPDIR", "tmp") +times = int(os.environ.get("TIMES", "1000")) +silent = os.environ.get("SILENT", False) + +tmpdb = os.path.join(folder, "test.soledad") + + +def debug(*args): + if not silent: + print(*args) + + +debug("[+] db path:", tmpdb) +debug("[+] times", times) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +dbpool = adbapi.getConnectionPool(opts) + + +def createDoc(doc): + return dbpool.runU1DBQuery("create_doc", doc) + + +def getAllDocs(): + return dbpool.runU1DBQuery("get_all_docs") + + +def countDocs(_): + debug("counting docs...") + d = getAllDocs() + d.addCallbacks(printResult, lambda e: e.printTraceback()) + d.addBoth(allDone) + + +def printResult(r): + if isinstance(r, l2db.Document): + debug(r.doc_id, r.content['number']) + else: + len_results = len(r[1]) + debug("GOT %s results" % len(r[1])) + + if len_results == times: + debug("ALL GOOD") + else: + raise ValueError("We didn't expect this result len") + + +def allDone(_): + debug("ALL DONE!") + if silent: + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + reactor.stop() + + +deferreds = [] +payload = open('manifest.phk').read() + +for i in range(times): + doc = {"number": i, "payload": payload} + d = createDoc(doc) + d.addCallbacks(printResult, lambda e: e.printTraceback()) + deferreds.append(d) + + +all_done = defer.gatherResults(deferreds, consumeErrors=True) +all_done.addCallback(countDocs) + +reactor.run() diff --git a/src/leap/soledad/client/examples/use_api.py b/src/leap/soledad/client/examples/use_api.py new file mode 100644 index 00000000..db77c4b3 --- /dev/null +++ b/src/leap/soledad/client/examples/use_api.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# use_api.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Example of use of the soledad api. +""" +from __future__ import print_function +import datetime +import os + +from leap.soledad.client import sqlcipher +from leap.soledad.client.sqlcipher import SQLCipherOptions + + +folder = os.environ.get("TMPDIR", "tmp") +times = int(os.environ.get("TIMES", "1000")) +silent = os.environ.get("SILENT", False) + +tmpdb = os.path.join(folder, "test.soledad") + + +def debug(*args): + if not silent: + print(*args) + + +debug("[+] db path:", tmpdb) +debug("[+] times", times) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +db = sqlcipher.SQLCipherDatabase(opts) + + +def allDone(): + debug("ALL DONE!") + + +payload = open('manifest.phk').read() + +for i in range(times): + doc = {"number": i, "payload": payload} + d = db.create_doc(doc) + debug(d.doc_id, d.content['number']) + +debug("Count", len(db.get_all_docs()[1])) +if silent: + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + +allDone() diff --git a/src/leap/soledad/client/http_target/__init__.py b/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..b67d03f6 --- /dev/null +++ b/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import os + +from twisted.web.client import Agent +from twisted.internet import reactor + +from leap.common.certs import get_compatible_ssl_context_factory +from leap.soledad.common.log import getLogger +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + def __init__(self, url, source_replica_uid, creds, crypto, cert_file): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad._crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self._uuid = None + self.set_creds(creds) + self._crypto = crypto + # TODO: DEPRECATED CRYPTO + self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret) + self._insert_doc_cb = None + + # Twisted default Agent with our own ssl context factory + factory = get_compatible_ssl_context_factory(cert_file) + self._http = Agent(reactor, factory) + + if DO_STATS: + self.sync_exchange_phase = [0] diff --git a/src/leap/soledad/client/http_target/api.py b/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..c68185c6 --- /dev/null +++ b/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import json +import base64 + +from six import StringIO +from uuid import uuid4 + +from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer + +from leap.soledad.client.http_target.support import readBody +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.common.l2db.errors import HTTPError +from leap.soledad.common.l2db import SyncTarget + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SyncTargetAPI(SyncTarget): + """ + Declares public methods and implements u1db.SyncTarget. + """ + + @property + def uuid(self): + return self._uuid + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + self._uuid = uuid + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None, body_reader=readBody, + body_producer=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) + if not body_producer and body: + body = FileBodyProducer(StringIO(body)) + elif body_producer: + # Upload case, check send.py + body = body_producer(body) + d = self._http.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(body_reader) + d.addErrback(_unauth_to_invalid_token_error) + return d + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield self._http_request(self._url) + res = json.loads(raw) + defer.returnValue(( + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + )) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + return self._http_request( + self._url, + method='PUT', + body=data, + content_type='application/json') + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + # ---------- phase 1: send docs to server ---------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + # ---------- phase 2: receive docs ----------------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + # ---------- phase 3: sync exchange is over -------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(HTTPError) + if failure.value.status == 401: + raise InvalidAuthTokenError + return failure diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..9d456830 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# fetch.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from twisted.internet import defer +from twisted.internet import threads + +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit_async +from leap.soledad.client.http_target.support import RequestBody +from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted +from leap.soledad.common.l2db import errors +from leap.soledad.client import crypto as old_crypto + +from .._document import Document +from . import fetch_protocol + +logger = getLogger(__name__) + + +class HTTPDocFetcher(object): + """ + Handles Document fetching from Soledad server, using HTTP as transport. + Steps: + * Prepares metadata by asking server for one document + * Fetch the total on response and prepare to ask all remaining + * (async) Documents will come encrypted. + So we parse, decrypt and insert locally as they arrive. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id): + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes + self.semaphore = defer.DeferredSemaphore(1) + + metadata = yield self._fetch_all( + last_known_generation, last_known_trans_id, + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + + # wait for pending inserts + yield self.semaphore.acquire() + + if ngen: + new_generation = ngen + new_transaction_id = ntrans + + defer.returnValue([new_generation, new_transaction_id]) + + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get', + body_reader=body_reader) + + @defer.inlineCallbacks + def _doc_parser(self, doc_info, content, total): + """ + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str + :param total: The total number of operations. + :type total: int + """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): + doc = Document(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(content): + content = (yield self._crypto.decrypt_doc(doc)).getvalue() + elif old_crypto.is_symmetrically_encrypted(doc): + content = self._deprecated_crypto.decrypt_doc(doc) + doc.set_json(content) + + # TODO insert blobs here on the blob backend + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) + self._received_docs += 1 + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total=total) + + def _parse_metadata(self, metadata): + """ + Parse the response from the server containing the sync metadata. + + :param response: Metadata as string + :type response: str + + :return: (number_of_changes, new_gen, new_trans_id) + :rtype: tuple + """ + try: + metadata = json.loads(metadata) + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) + except (ValueError, KeyError): + raise errors.BrokenSyncStream('Metadata parsing failed') + + +def _emit_receive_status(user_data, received_docs, total): + content = {'received': received_docs, 'total': total} + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content) + + if received_docs % 20 == 0: + msg = "%d/%d" % (received_docs, total) + logger.debug("Sync receive status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/fetch_protocol.py b/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..851eb3a1 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from functools import partial +from six import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self.metadata = '' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if self.deferred.called: + return + try: + if reason.check(ResponseDone): + self.dataBuffer = self.metadata + else: + self.dataBuffer = self.finish() + except errors.BrokenSyncStream as e: + return self.deferred.errback(e) + return ReadBodyProtocol.connectionLost(self, reason) + + def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + self.lineReceived(line) + self._line += 1 + + def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ + if self._properly_finished: + raise errors.BrokenSyncStream("Reading a finished stream") + if ']' == line: + self._properly_finished = True + elif self._line == 0: + if line is not '[': + raise errors.BrokenSyncStream("Invalid start") + elif self._line == 1: + self.metadata = line + if 'error' in self.metadata: + raise errors.BrokenSyncStream("Error from server: %s" % line) + self.total = json.loads(line).get('number_of_changes', -1) + elif (self._line % 2) == 0: + self.current_doc = json.loads(line) + if 'error' in self.current_doc: + raise errors.BrokenSyncStream("Error from server: %s" % line) + else: + d = self._doc_reader( + self.current_doc, line.strip() or None, self.total) + d.addErrback(self.deferred.errback) + + def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ + if not self._properly_finished: + raise errors.BrokenSyncStream('Stream not properly closed') + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type doc_reader: function + + @return: A function that can be called by the http Agent to create and + configure the proper protocol. + """ + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/src/leap/soledad/client/http_target/send.py b/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..2b286ec5 --- /dev/null +++ b/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# send.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json + +from twisted.internet import defer + +from leap.soledad.common.log import getLogger +from leap.soledad.client.events import emit_async +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer + +logger = getLogger(__name__) + + +class HTTPDocSender(object): + """ + Handles Document uploading from Soledad server, using HTTP as transport. + They need to be encrypted and metadata prepared before sending. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + result = yield self._send_batch(body, docs_by_generation) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_batch(self, body, docs): + total, calls = len(docs), [] + for i, entry in enumerate(docs): + calls.append((self._prepare_one_doc, + entry, body, i + 1, total)) + result = yield self._send_request(body, calls) + _emit_send_status(self.uuid, body.consumed, total) + + defer.returnValue(result) + + def _send_request(self, body, calls): + return self._http_request( + self._url, + method='POST', + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) + + @defer.inlineCallbacks + def _prepare_one_doc(self, entry, body, idx, total): + get_doc_call, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc_call) + body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=total, + doc_idx=idx) + _emit_send_status(self.uuid, body.consumed, total) + + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc_call): + f, args, kwargs = get_doc_call + doc = yield f(*args, **kwargs) + if doc.is_tombstone(): + defer.returnValue((doc, None)) + else: + content = yield self._crypto.encrypt_doc(doc) + defer.returnValue((doc, content)) + + +def _emit_send_status(user_data, idx, total): + content = {'sent': idx, 'total': total} + emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content) + + msg = "%d/%d" % (idx, total) + logger.debug("Sync send status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/send_protocol.py b/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..4941aa34 --- /dev/null +++ b/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +from zope.interface import implementer +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +@implementer(IBodyProducer) +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + def __init__(self, producer): + """ + Initialize the string produer. + + :param producer: A RequestBody instance and a list of producer calls + :type producer: (.support.RequestBody, [(function, *args)]) + """ + self.body, self.producer = producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.001) + continue + call = self.producer.pop(0) + fun, args = call[0], call[1:] + yield fun(*args) + consumer.write(self.body.pop(1, leave_open=True)) + consumer.write(self.body.pop(0)) # close stream + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/src/leap/soledad/client/http_target/support.py b/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..d8d8e420 --- /dev/null +++ b/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import warnings +import json + +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + + +class ReadBodyProtocol(_ReadBodyProtocol): + """ + From original Twisted implementation, focused on adding our error + handling and ensuring that the proper u1db error is raised. + """ + + def __init__(self, response, deferred): + """ + Initialize the protocol, additionally storing the response headers. + """ + _ReadBodyProtocol.__init__( + self, response.code, response.phrase, deferred) + self.headers = response.headers + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + else: + self.deferred.errback( + errors.HTTPError(self.status, respdic, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + body = b''.join(self.dataBuffer) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(body) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(self.dataBuffer))) + else: + self.deferred.errback(reason) + + +def readBody(response, protocolClass=ReadBodyProtocol): + """ + Get the body of an L{IResponse} and return it as a byte string. + + This is a helper function for clients that don't want to incrementally + receive the body of an HTTP response. + + @param response: The HTTP response for which the body will be read. + @type response: L{IResponse} provider + + @return: A L{Deferred} which will fire with the body of the response. + Cancelling it will close the connection to the server immediately. + """ + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + d = defer.Deferred(cancel) + protocol = protocolClass(response, d) + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + response.deliverBody(protocol) + + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + + return d + + +class RequestBody(object): + """ + This class is a helper to generate send and fetch requests. + The expected format is something like: + [ + {headers}, + {entry1}, + {...}, + {entryN}, + ] + """ + + def __init__(self, **header_dict): + """ + Creates a new RequestBody holding header information. + + :param header_dict: A dictionary with the headers. + :type header_dict: dict + """ + self.headers = header_dict + self.entries = [] + self.consumed = 0 + + def insert_info(self, **entry_dict): + """ + Dumps an entry into JSON format and add it to entries list. + Adds 'content' key on a new line if it's present. + + :param entry_dict: Entry as a dictionary + :type entry_dict: dict + """ + content = '' + if 'content' in entry_dict: + content = ',\r\n' + (entry_dict['content'] or '') + entry = json.dumps(entry_dict) + content + self.entries.append(entry) + + def pop(self, amount=10, leave_open=False): + """ + Removes entries and returns it formatted and ready + to be sent. + + :param amount: number of entries to pop and format + :type amount: int + + :param leave_open: flag to skip stream closing + :type amount: bool + + :return: formatted body ready to be sent + :rtype: str + """ + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 if not leave_open else False + return self.entries_to_str(entries, start, end) + + def __str__(self): + return self.pop(len(self.entries)) + + def __len__(self): + return len(self.entries) + + def entries_to_str(self, entries=None, start=True, end=True): + """ + Format a list of entries into the body format expected + by the server. + + :param entries: entries to format + :type entries: list + + :return: formatted body ready to be sent + :rtype: str + """ + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + if end: + data += '\r\n]' + return data diff --git a/src/leap/soledad/client/interfaces.py b/src/leap/soledad/client/interfaces.py new file mode 100644 index 00000000..0600449f --- /dev/null +++ b/src/leap/soledad/client/interfaces.py @@ -0,0 +1,368 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Interfaces used by the Soledad Client. +""" +from zope.interface import Interface, Attribute + +# +# Plugins +# + + +class ISoledadPostSyncPlugin(Interface): + """ + I implement the minimal methods and attributes for a plugin that can be + called after a soledad synchronization has ended. + """ + + def process_received_docs(self, doc_id_list): + """ + Do something with the passed list of doc_ids received after the last + sync. + + :param doc_id_list: a list of strings for the received doc_ids + """ + + watched_doc_types = Attribute(""" + a tuple of the watched doc types for this plugin. So far, the + `doc-types` convention is just the preffix of the doc_id, which is + basically its first character, followed by a dash. So, for instance, + `M-` is used for meta-docs in mail, and `F-` is used for flag-docs in + mail. For now there's no central register of all the doc-types + used.""") + + +# +# Soledad storage +# + +class ILocalStorage(Interface): + """ + I implement core methods for the u1db local storage of documents and + indexes. + """ + local_db_path = Attribute( + "The path for the local database replica") + local_db_file_name = Attribute( + "The name of the local SQLCipher U1DB database file") + uuid = Attribute("The user uuid") + default_prefix = Attribute( + "Prefix for default values for path") + + def put_doc(self, doc): + """ + Update a document in the local encrypted database. + + :param doc: the document to update + :type doc: Document + + :return: + a deferred that will fire with the new revision identifier for + the document + :rtype: Deferred + """ + + def delete_doc(self, doc): + """ + Delete a document from the local encrypted database. + + :param doc: the document to delete + :type doc: Document + + :return: + a deferred that will fire with ... + :rtype: Deferred + """ + + def get_doc(self, doc_id, include_deleted=False): + """ + Retrieve a document from the local encrypted database. + + :param doc_id: the unique document identifier + :type doc_id: str + :param include_deleted: + if True, deleted documents will be returned with empty content; + otherwise asking for a deleted document will return None + :type include_deleted: bool + + :return: + A deferred that will fire with the document object, containing a + Document, or None if it could not be found + :rtype: Deferred + """ + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """ + Get the content for many documents. + + :param doc_ids: a list of document identifiers + :type doc_ids: list + :param check_for_conflicts: if set False, then the conflict check will + be skipped, and 'None' will be returned instead of True/False + :type check_for_conflicts: bool + + :return: + A deferred that will fire with an iterable giving the Document + object for each document id in matching doc_ids order. + :rtype: Deferred + """ + + def get_all_docs(self, include_deleted=False): + """ + Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: + A deferred that will fire with (generation, [Document]): that is, + the current generation of the database, followed by a list of all + the documents in the database. + :rtype: Deferred + """ + + def create_doc(self, content, doc_id=None): + """ + Create a new document in the local encrypted database. + + :param content: the contents of the new document + :type content: dict + :param doc_id: an optional identifier specifying the document id + :type doc_id: str + + :return: + A deferred tht will fire with the new document (Document + instance). + :rtype: Deferred + """ + + def create_doc_from_json(self, json, doc_id=None): + """ + Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param json: The JSON document string + :type json: str + :param doc_id: An optional identifier specifying the document id. + :type doc_id: + :return: + A deferred that will fire with the new document (A Document + instance) + :rtype: Deferred + """ + + def create_index(self, index_name, *index_expressions): + """ + Create an named index, which can then be queried for future lookups. + Creating an index which already exists is not an error, and is cheap. + Creating an index which does not match the index_expressions of the + existing index is an error. + Creating an index will block until the expressions have been evaluated + and the index generated. + + :param index_name: A unique name which can be used as a key prefix + :type index_name: str + :param index_expressions: + index expressions defining the index information. + :type index_expressions: dict + + Examples: + + "fieldname", or "fieldname.subfieldname" to index alphabetically + sorted on the contents of a field. + + "number(fieldname, width)", "lower(fieldname)" + """ + + def delete_index(self, index_name): + """ + Remove a named index. + + :param index_name: The name of the index we are removing + :type index_name: str + """ + + def list_indexes(self): + """ + List the definitions of all known indexes. + + :return: A list of [('index-name', ['field', 'field2'])] definitions. + :rtype: Deferred + """ + + def get_from_index(self, index_name, *key_values): + """ + Return documents that match the keys supplied. + + You must supply exactly the same number of values as have been defined + in the index. It is possible to do a prefix match by using '*' to + indicate a wildcard match. You can only supply '*' to trailing entries, + (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) + It is also possible to append a '*' to the last supplied value (eg + 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: List of [Document] + :rtype: list + """ + + def get_count_from_index(self, index_name, *key_values): + """ + Return the count of the documents that match the keys and + values supplied. + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: count. + :rtype: int + """ + + def get_range_from_index(self, index_name, start_value, end_value): + """ + Return documents that fall within the specified range. + + Both ends of the range are inclusive. For both start_value and + end_value, one must supply exactly the same number of values as have + been defined in the index, or pass None. In case of a single column + index, a string is accepted as an alternative for a tuple with a single + value. It is possible to do a prefix match by using '*' to indicate + a wildcard match. You can only supply '*' to trailing entries, (eg + 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also + possible to append a '*' to the last supplied value (eg 'val*', '*', + '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param start_values: tuples of values that define the lower bound of + the range. eg, if you have an index with 3 fields then you would + have: (val1, val2, val3) + :type start_values: tuple + :param end_values: tuples of values that define the upper bound of the + range. eg, if you have an index with 3 fields then you would have: + (val1, val2, val3) + :type end_values: tuple + :return: A deferred that will fire with a list of [Document] + :rtype: Deferred + """ + + def get_index_keys(self, index_name): + """ + Return all keys under which documents are indexed in this index. + + :param index_name: The index to query + :type index_name: str + :return: + A deferred that will fire with a list of tuples of indexed keys. + :rtype: Deferred + """ + + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. + + :param doc_id: the document id + :type doc_id: str + + :return: + A deferred that will fire with a list of the document entries that + are conflicted. + :rtype: Deferred + """ + + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + :param doc: a document with the new content to be inserted. + :type doc: Document + :param conflicted_doc_revs: + A deferred that will fire with a list of revisions that the new + content supersedes. + :type conflicted_doc_revs: list + """ + + +class ISyncableStorage(Interface): + """ + I implement methods to synchronize with a remote replica. + """ + replica_uid = Attribute("The uid of the local replica") + syncing = Attribute( + "Property, True if the syncer is syncing.") + token = Attribute("The authentication Token.") + + def sync(self): + """ + Synchronize the local encrypted replica with a remote replica. + + This method blocks until a syncing lock is acquired, so there are no + attempts of concurrent syncs from the same client replica. + + :param url: the url of the target replica to sync with + :type url: str + + :return: + A deferred that will fire with the local generation before the + synchronisation was performed. + :rtype: str + """ + + def stop_sync(self): + """ + Stop the current syncing process. + """ + + +class ISecretsStorage(Interface): + """ + I implement methods needed for initializing and accessing secrets, that are + synced against the Shared Recovery Database. + """ + secrets_file_name = Attribute( + "The name of the file where the storage secrets will be stored") + + # XXX this used internally from secrets, so it might be good to preserve + # as a public boundary with other components. + + # We should also probably document its interface. + secrets = Attribute("A SoledadSecrets object containing access to secrets") + + def change_passphrase(self, new_passphrase): + """ + Change the passphrase that encrypts the storage secret. + + :param new_passphrase: The new passphrase. + :type new_passphrase: unicode + + :raise NoStorageSecret: Raised if there's no storage secret available. + """ diff --git a/src/leap/soledad/client/shared_db.py b/src/leap/soledad/client/shared_db.py new file mode 100644 index 00000000..4f70c74b --- /dev/null +++ b/src/leap/soledad/client/shared_db.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# shared_db.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A shared database for storing/retrieving encrypted key material. +""" +from leap.soledad.common.l2db.remote.http_database import HTTPDatabase + +from leap.soledad.client.auth import TokenBasedAuth + + +# ---------------------------------------------------------------------------- +# Soledad shared database +# ---------------------------------------------------------------------------- + +# TODO could have a hierarchy of soledad exceptions. + + +class NoTokenForAuth(Exception): + """ + No token was found for token-based authentication. + """ + + +class Unauthorized(Exception): + """ + User does not have authorization to perform task. + """ + + +class ImproperlyConfiguredError(Exception): + """ + Wrong parameters in the database configuration. + """ + + +class SoledadSharedDatabase(HTTPDatabase, TokenBasedAuth): + """ + This is a shared recovery database that enables users to store their + encryption secrets in the server and retrieve them afterwards. + """ + # TODO: prevent client from messing with the shared DB. + # TODO: define and document API. + + # + # Token auth methods. + # + + def set_token_credentials(self, uuid, token): + """ + Store given credentials so we can sign the request later. + + :param uuid: The user's uuid. + :type uuid: str + :param token: The authentication token. + :type token: str + """ + TokenBasedAuth.set_token_credentials(self, uuid, token) + + def _sign_request(self, method, url_query, params): + """ + Return an authorization header to be included in the HTTP request. + + :param method: The HTTP method. + :type method: str + :param url_query: The URL query string. + :type url_query: str + :param params: A list with encoded query parameters. + :type param: list + + :return: The Authorization header. + :rtype: list of tuple + """ + return TokenBasedAuth._sign_request(self, method, url_query, params) + + # + # Modified HTTPDatabase methods. + # + + @staticmethod + def open_database(url, creds=None): + """ + Open a Soledad shared database. + + :param url: URL of the remote database. + :type url: str + :param creds: A tuple containing the authentication method and + credentials. + :type creds: tuple + + :return: The shared database in the given url. + :rtype: SoledadSharedDatabase + """ + db = SoledadSharedDatabase(url, creds=creds) + return db + + @staticmethod + def delete_database(url): + """ + Dummy method that prevents from deleting shared database. + + :raise: This will always raise an Unauthorized exception. + + :param url: The database URL. + :type url: str + """ + raise Unauthorized("Can't delete shared database.") + + def __init__(self, url, document_factory=None, creds=None): + """ + Initialize database with auth token and encryption powers. + + :param url: URL of the remote database. + :type url: str + :param document_factory: A factory for U1BD documents. + :type document_factory: u1db.Document + :param creds: A tuple containing the authentication method and + credentials. + :type creds: tuple + """ + HTTPDatabase.__init__(self, url, document_factory, creds) diff --git a/src/leap/soledad/client/sync.py b/src/leap/soledad/client/sync.py new file mode 100644 index 00000000..2a927189 --- /dev/null +++ b/src/leap/soledad/client/sync.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- +# sync.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad synchronization utilities. +""" +import os + +from twisted.internet import defer + +from leap.soledad.common.log import getLogger +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.sync import Synchronizer +from leap.soledad.common.errors import BackendNotReadyError + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SoledadSynchronizer(Synchronizer): + """ + Collect the state around synchronizing 2 U1DB replicas. + + Synchronization is bi-directional, in that new items in the source are sent + to the target, and new items in the target are returned to the source. + However, it still recognizes that one side is initiating the request. Also, + at the moment, conflicts are only created in the source. + + Also modified to allow for interrupting the synchronization process. + """ + received_docs = [] + + def __init__(self, *args, **kwargs): + Synchronizer.__init__(self, *args, **kwargs) + if DO_STATS: + self.sync_phase = [0] + self.sync_exchange_phase = None + + @defer.inlineCallbacks + def sync(self): + """ + Synchronize documents between source and target. + + :return: A deferred which will fire after the sync has finished with + the local generation before the synchronization was performed. + :rtype: twisted.internet.defer.Deferred + """ + + sync_target = self.sync_target + self.received_docs = [] + + # ---------- phase 1: get sync info from server ---------------------- + if DO_STATS: + self.sync_phase[0] += 1 + self.sync_exchange_phase = self.sync_target.sync_exchange_phase + # -------------------------------------------------------------------- + + # get target identifier, its current generation, + # and its last-seen database generation for this source + ensure_callback = None + try: + (self.target_replica_uid, target_gen, target_trans_id, + target_my_gen, target_my_trans_id) = yield \ + sync_target.get_sync_info(self.source._replica_uid) + except (errors.DatabaseDoesNotExist, BackendNotReadyError) as e: + logger.warn("Database isn't ready on server. Will be created.") + logger.warn("Reason: %s" % e.__class__) + self.target_replica_uid = None + target_gen, target_trans_id = 0, '' + target_my_gen, target_my_trans_id = 0, '' + + logger.debug("target replica uid: %s" % self.target_replica_uid) + logger.debug("target generation: %d" % target_gen) + logger.debug("target trans id: %s" % target_trans_id) + logger.debug("target my gen: %d" % target_my_gen) + logger.debug("target my trans_id: %s" % target_my_trans_id) + logger.debug("source replica_uid: %s" % self.source._replica_uid) + + # make sure we'll have access to target replica uid once it exists + if self.target_replica_uid is None: + + def ensure_callback(replica_uid): + self.target_replica_uid = replica_uid + + # make sure we're not syncing one replica with itself + if self.target_replica_uid == self.source._replica_uid: + raise errors.InvalidReplicaUID + + # validate the info the target has about the source replica + self.source.validate_gen_and_trans_id( + target_my_gen, target_my_trans_id) + + # ---------- phase 2: what's changed --------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + # what's changed since that generation and this current gen + my_gen, _, changes = self.source.whats_changed(target_my_gen) + logger.debug("there are %d documents to send" % len(changes)) + + # get source last-seen database generation for the target + if self.target_replica_uid is None: + target_last_known_gen, target_last_known_trans_id = 0, '' + else: + target_last_known_gen, target_last_known_trans_id = \ + self.source._get_replica_gen_and_trans_id( + self.target_replica_uid) + logger.debug( + "last known target gen: %d" % target_last_known_gen) + logger.debug( + "last known target trans_id: %s" % target_last_known_trans_id) + + # validate transaction ids + if not changes and target_last_known_gen == target_gen: + if target_trans_id != target_last_known_trans_id: + raise errors.InvalidTransactionId + defer.returnValue(my_gen) + + # ---------- phase 3: sync exchange ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + docs_by_generation = self._docs_by_gen_from_changes(changes) + + # exchange documents and try to insert the returned ones with + # the target, return target synced-up-to gen. + new_gen, new_trans_id = yield sync_target.sync_exchange( + docs_by_generation, self.source._replica_uid, + target_last_known_gen, target_last_known_trans_id, + self._insert_doc_from_target, ensure_callback=ensure_callback) + ids_sent = [doc_id for doc_id, _, _ in changes] + logger.debug("target gen after sync: %d" % new_gen) + logger.debug("target trans_id after sync: %s" % new_trans_id) + if hasattr(self.source, 'commit'): # sqlcipher backend speed up + self.source.commit() # insert it all in a single transaction + info = { + "target_replica_uid": self.target_replica_uid, + "new_gen": new_gen, + "new_trans_id": new_trans_id, + "my_gen": my_gen + } + self._syncing_info = info + + # ---------- phase 4: complete sync ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + yield self.complete_sync() + + _, _, changes = self.source.whats_changed(target_my_gen) + changed_doc_ids = [doc_id for doc_id, _, _ in changes] + + just_received = list(set(changed_doc_ids) - set(ids_sent)) + self.received_docs = just_received + + # ---------- phase 5: sync is over ----------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + + defer.returnValue(my_gen) + + def _docs_by_gen_from_changes(self, changes): + docs_by_generation = [] + kwargs = {'include_deleted': True} + for doc_id, gen, trans in changes: + get_doc = (self.source.get_doc, (doc_id,), kwargs) + docs_by_generation.append((get_doc, gen, trans)) + return docs_by_generation + + def complete_sync(self): + """ + Last stage of the synchronization: + (a) record last known generation and transaction uid for the remote + replica, and + (b) make target aware of our current reached generation. + + :return: A deferred which will fire when the sync has been completed. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("completing deferred last step in sync...") + + # record target synced-up-to generation including applying what we + # sent + info = self._syncing_info + self.source._set_replica_gen_and_trans_id( + info["target_replica_uid"], info["new_gen"], info["new_trans_id"]) + + # if gapless record current reached generation with target + return self._record_sync_info_with_the_target(info["my_gen"]) + + def _record_sync_info_with_the_target(self, start_generation): + """ + Store local replica metadata in server. + + :param start_generation: The local generation when the sync was + started. + :type start_generation: int + + :return: A deferred which will fire when the operation has been + completed. + :rtype: twisted.internet.defer.Deferred + """ + cur_gen, trans_id = self.source._get_generation_info() + if (cur_gen == start_generation + self.num_inserted and + self.num_inserted > 0): + return self.sync_target.record_sync_info( + self.source._replica_uid, cur_gen, trans_id) + return defer.succeed(None) |