diff options
author | drebs <drebs@leap.se> | 2016-11-10 23:50:30 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2016-11-10 23:50:30 -0200 |
commit | c1950b41e0995b0213227bd0ce2c633f312037dc (patch) | |
tree | 7c1fde54442fefd3553d33b3fe5a2ec454e0196b /client/src/leap | |
parent | 507e284773d9c4954225635741f275c5d327e2a9 (diff) | |
parent | 6b23b3f3215f2443aa3e790559b63a41b3040072 (diff) |
Merge tag '0.8.1'
0.8.1
Diffstat (limited to 'client/src/leap')
17 files changed, 474 insertions, 546 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 77822247..ef0f9066 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -24,13 +24,12 @@ import sys import logging from functools import partial -from threading import BoundedSemaphore from twisted.enterprise import adbapi +from twisted.internet.defer import DeferredSemaphore from twisted.python import log from zope.proxy import ProxyBase, setProxiedObject -from pysqlcipher.dbapi2 import OperationalError -from pysqlcipher.dbapi2 import DatabaseError +from pysqlcipher import dbapi2 from leap.soledad.common.errors import DatabaseAccessError @@ -49,7 +48,7 @@ if DEBUG_SQL: How long the SQLCipher connection should wait for the lock to go away until raising an exception. """ -SQLCIPHER_CONNECTION_TIMEOUT = 10 +SQLCIPHER_CONNECTION_TIMEOUT = 5 """ How many times a SQLCipher query should be retried in case of timeout. @@ -79,9 +78,11 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher", if openfun is None and driver == "pysqlcipher": openfun = partial(set_init_pragmas, opts=opts) return U1DBConnectionPool( - "%s.dbapi2" % driver, opts=opts, sync_enc_pool=sync_enc_pool, - database=opts.path, check_same_thread=False, cp_openfun=openfun, - timeout=SQLCIPHER_CONNECTION_TIMEOUT) + opts, sync_enc_pool, + # the following params are relayed "as is" to twisted's + # ConnectionPool. + "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT, + check_same_thread=False, cp_openfun=openfun) class U1DBConnection(adbapi.Connection): @@ -105,8 +106,10 @@ class U1DBConnection(adbapi.Connection): self._sync_enc_pool = sync_enc_pool try: adbapi.Connection.__init__(self, pool) - except DatabaseError: - raise DatabaseAccessError('Could not open sqlcipher database') + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing connection to sqlcipher database: %s' + % str(e)) def reconnect(self): """ @@ -165,17 +168,17 @@ class U1DBConnectionPool(adbapi.ConnectionPool): connectionFactory = U1DBConnection transactionFactory = U1DBTransaction - def __init__(self, *args, **kwargs): + def __init__(self, opts, sync_enc_pool, *args, **kwargs): """ Initialize the connection pool. """ - # extract soledad-specific objects from keyword arguments - self.opts = kwargs.pop("opts") - self._sync_enc_pool = kwargs.pop("sync_enc_pool") + self.opts = opts + self._sync_enc_pool = sync_enc_pool try: adbapi.ConnectionPool.__init__(self, *args, **kwargs) - except DatabaseError: - raise DatabaseAccessError('Could not open sqlcipher database') + except dbapi2.DatabaseError as e: + raise DatabaseAccessError( + 'Error initializing u1db connection pool: %s' % str(e)) # all u1db connections, hashed by thread-id self._u1dbconnections = {} @@ -183,10 +186,15 @@ class U1DBConnectionPool(adbapi.ConnectionPool): # The replica uid, primed by the connections on init. self.replica_uid = ProxyBase(None) - conn = self.connectionFactory( - self, self._sync_enc_pool, init_u1db=True) - replica_uid = conn._u1db._real_replica_uid - setProxiedObject(self.replica_uid, replica_uid) + try: + conn = self.connectionFactory( + self, self._sync_enc_pool, init_u1db=True) + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + except DatabaseAccessError as e: + self.threadpool.stop() + raise DatabaseAccessError( + "Error initializing connection factory: %s" % str(e)) def runU1DBQuery(self, meth, *args, **kw): """ @@ -204,16 +212,17 @@ class U1DBConnectionPool(adbapi.ConnectionPool): :rtype: twisted.internet.defer.Deferred """ meth = "u1db_%s" % meth - semaphore = BoundedSemaphore(SQLCIPHER_MAX_RETRIES - 1) + semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES) def _run_interaction(): return self.runInteraction( self._runU1DBQuery, meth, *args, **kw) def _errback(failure): - failure.trap(OperationalError) + failure.trap(dbapi2.OperationalError) if failure.getErrorMessage() == "database is locked": - should_retry = semaphore.acquire(False) + logger.warning("Database operation timed out.") + should_retry = semaphore.acquire() if should_retry: logger.warning( "Database operation timed out while waiting for " diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index e657c939..1bfbed8a 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -35,16 +35,11 @@ import ssl import uuid import urlparse -try: - import cchardet as chardet -except ImportError: - import chardet from itertools import chain from StringIO import StringIO from collections import defaultdict -from u1db.remote import http_client -from u1db.remote.ssl_match_hostname import match_hostname + from twisted.internet.defer import DeferredLock, returnValue, inlineCallbacks from zope.interface import implements @@ -54,6 +49,9 @@ from leap.common.plugins import collect_plugins from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type +from leap.soledad.common.l2db.remote import http_client +from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname +from leap.soledad.common.errors import DatabaseAccessError from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events @@ -66,6 +64,13 @@ from leap.soledad.client import encdecpool logger = logging.getLogger(name=__name__) + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + # # Constants # @@ -126,8 +131,7 @@ class Soledad(object): def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, shared_db=None, - auth_token=None, defer_encryption=False, syncable=True, - userid=None): + auth_token=None, defer_encryption=False, syncable=True): """ Initialize configuration, cryptographic keys and dbs. @@ -181,7 +185,6 @@ class Soledad(object): """ # store config params self._uuid = uuid - self._userid = userid self._passphrase = passphrase self._local_db_path = local_db_path self._server_url = server_url @@ -211,10 +214,22 @@ class Soledad(object): self._init_secrets() self._crypto = SoledadCrypto(self._secrets.remote_storage_secret) - self._init_u1db_sqlcipher_backend() - if syncable: - self._init_u1db_syncer() + try: + # initialize database access, trap any problems so we can shutdown + # smoothly. + self._init_u1db_sqlcipher_backend() + if syncable: + self._init_u1db_syncer() + except DatabaseAccessError: + # oops! something went wrong with backend initialization. We + # have to close any thread-related stuff we have already opened + # here, otherwise there might be zombie threads that may clog the + # reactor. + self._sync_db.close() + if hasattr(self, '_dbpool'): + self._dbpool.close() + raise # # initialization/destruction methods @@ -255,7 +270,7 @@ class Soledad(object): """ self._secrets = SoledadSecrets( self.uuid, self._passphrase, self._secrets_path, - self.shared_db, userid=self._userid) + self.shared_db, userid=self.userid) self._secrets.bootstrap() def _init_u1db_sqlcipher_backend(self): @@ -303,6 +318,17 @@ class Soledad(object): sync_db=self._sync_db, sync_enc_pool=self._sync_enc_pool) + def sync_stats(self): + sync_phase = 0 + if getattr(self._dbsyncer, 'sync_phase', None): + sync_phase = self._dbsyncer.sync_phase[0] + sync_exchange_phase = 0 + if getattr(self._dbsyncer, 'syncer', None): + if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None): + _p = self._dbsyncer.syncer.sync_exchange_phase[0] + sync_exchange_phase = _p + return sync_phase, sync_exchange_phase + # # Closing methods # @@ -359,7 +385,6 @@ class Soledad(object): also be updated. :rtype: twisted.internet.defer.Deferred """ - doc.content = _convert_to_unicode(doc.content) return self._defer("put_doc", doc) def delete_doc(self, doc): @@ -454,8 +479,7 @@ class Soledad(object): # create_doc (and probably to put_doc too). There are cases (mail # payloads for example) in which we already have the encoding in the # headers, so we don't need to guess it. - return self._defer( - "create_doc", _convert_to_unicode(content), doc_id=doc_id) + return self._defer("create_doc", content, doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): """ @@ -655,7 +679,7 @@ class Soledad(object): @property def userid(self): - return self._userid + return self.uuid # # ISyncableStorage @@ -976,44 +1000,6 @@ class Soledad(object): return self.create_doc(doc) -def _convert_to_unicode(content): - """ - Convert content to unicode (or all the strings in content). - - NOTE: Even though this method supports any type, it will - currently ignore contents of lists, tuple or any other - iterable than dict. We don't need support for these at the - moment - - :param content: content to convert - :type content: object - - :rtype: object - """ - # Chardet doesn't guess very well with some smallish payloads. - # This parameter might need some empirical tweaking. - CUTOFF_CONFIDENCE = 0.90 - - if isinstance(content, unicode): - return content - elif isinstance(content, str): - encoding = "utf-8" - result = chardet.detect(content) - if result["confidence"] > CUTOFF_CONFIDENCE: - encoding = result["encoding"] - try: - content = content.decode(encoding) - except UnicodeError as e: - logger.error("Unicode error: {0!r}. Using 'replace'".format(e)) - content = content.decode(encoding, 'replace') - return content - else: - if isinstance(content, dict): - for key in content.keys(): - content[key] = _convert_to_unicode(content[key]) - return content - - def create_path_if_not_exists(path): try: if not os.path.isdir(path): diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py index 6dfabeb4..78e9bf1b 100644 --- a/client/src/leap/soledad/client/auth.py +++ b/client/src/leap/soledad/client/auth.py @@ -22,7 +22,7 @@ they can do token-based auth requests to the Soledad server. """ import base64 -from u1db import errors +from leap.soledad.common.l2db import errors class TokenBasedAuth(object): diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 363d71b9..f7d92372 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -26,7 +26,8 @@ import logging from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends.multibackend import MultiBackend -from cryptography.hazmat.backends.openssl.backend import Backend as OpenSSLBackend +from cryptography.hazmat.backends.openssl.backend \ + import Backend as OpenSSLBackend from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type @@ -38,6 +39,8 @@ logger = logging.getLogger(__name__) MAC_KEY_LENGTH = 64 +crypto_backend = MultiBackend([OpenSSLBackend()]) + def encrypt_sym(data, key): """ @@ -58,8 +61,7 @@ def encrypt_sym(data, key): (len(key) * 8)) iv = os.urandom(16) - backend = MultiBackend([OpenSSLBackend()]) - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() @@ -86,9 +88,8 @@ def decrypt_sym(data, key, iv): soledad_assert( len(key) == 32, # 32 x 8 = 256 bits. 'Wrong key size: %s (must be 256 bits long).' % len(key)) - backend = MultiBackend([OpenSSLBackend()]) iv = binascii.a2b_base64(iv) - cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend) + cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend) decryptor = cipher.decryptor() return decryptor.update(data) + decryptor.finalize() diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 34667a1e..a6d49b21 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,12 +22,11 @@ during synchronization. """ -import multiprocessing -import Queue import json import logging +from uuid import uuid4 -from twisted.internet import reactor +from twisted.internet.task import LoopingCall from twisted.internet import threads from twisted.internet import defer from twisted.python import log @@ -51,9 +50,6 @@ class SyncEncryptDecryptPool(object): Base class for encrypter/decrypter pools. """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -66,21 +62,14 @@ class SyncEncryptDecryptPool(object): """ self._crypto = crypto self._sync_db = sync_db - self._pool = None self._delayed_call = None self._started = False def start(self): - if self.running: - return - self._create_pool() self._started = True def stop(self): - if not self.running: - return self._started = False - self._destroy_pool() # maybe cancel the next delayed call if self._delayed_call \ and not self._delayed_call.called: @@ -90,27 +79,6 @@ class SyncEncryptDecryptPool(object): def running(self): return self._started - def _create_pool(self): - self._pool = multiprocessing.Pool(self.WORKERS) - - def _destroy_pool(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - def _runOperation(self, query, *args): """ Run an operation on the sync db. @@ -180,7 +148,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -189,73 +156,33 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ SyncEncryptDecryptPool.start(self) logger.debug("Starting the encryption loop...") - reactor.callWhenRunning(self._maybe_encrypt_and_recurse) def stop(self): """ Stop the encrypter pool. """ - # close the sync queue - if self._encr_queue: - q = self._encr_queue - for d in q.pending: - d.cancel() - del q - self._encr_queue = None SyncEncryptDecryptPool.stop(self) - def enqueue_doc_for_encryption(self, doc): + def encrypt_doc(self, doc): """ - Enqueue a document for encryption. + Encrypt document asynchronously then insert it on + local staging database. :param doc: The document to be encrypted. :type doc: SoledadDocument """ - try: - self._encr_queue.put(doc) - except Queue.Full: - # do not asynchronously encrypt this file if the queue is full - pass - - @defer.inlineCallbacks - def _maybe_encrypt_and_recurse(self): - """ - Process one document from the encryption queue. - - Asynchronously encrypt a document that will then be stored in the sync - db. Processed documents will be read by the SoledadSyncTarget during - the sync_exchange. - """ - try: - while self.running: - doc = yield self._encr_queue.get() - self._encrypt_doc(doc) - except defer.QueueUnderflow: - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) - - def _encrypt_doc(self, doc): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ soledad_assert(self._crypto is not None, "need a crypto object") docstr = doc.get_json() key = self._crypto.doc_passphrase(doc.doc_id) secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret # encrypt asynchronously - self._pool.apply_async( - encrypt_doc_task, args, - callback=self._encrypt_doc_cb) + # TODO use dedicated threadpool / move to ampoule + d = threads.deferToThread( + encrypt_doc_task, *args) + d.addCallback(self._encrypt_doc_cb) + return d def _encrypt_doc_cb(self, result): """ @@ -336,8 +263,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, :type doc_id: str :param doc_rev: The document revision. :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str + :param content: The encrypted content of the document as JSON dict. + :type content: dict :param gen: The generation corresponding to the modification of that document. :type gen: int @@ -384,7 +311,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ TABLE_NAME = "docs_received" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx" + "trans_id, encrypted, idx, sync_id" """ Period of recurrence of the periodic decrypting task, in seconds. @@ -414,46 +341,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._docs_to_process = None self._processed_docs = 0 self._last_inserted_idx = 0 - self._decrypting_docs = [] - - # a list that holds the asynchronous decryption results so they can be - # collected when they are ready - self._async_results = [] - # initialize db and make sure any database operation happens after - # db initialization - self._deferred_init = self._init_db() - self._wait_init_db('_runOperation', '_runQuery') - - def _wait_init_db(self, *methods): - """ - Methods that need to wait for db initialization. - - :param methods: methods that need to wait for initialization - :type methods: tuple(str) - """ - self._waiting = [] - self._stored = {} - - def _restore(_): - for method in self._stored: - setattr(self, method, self._stored[method]) - for d in self._waiting: - d.callback(None) - - def _makeWrapper(method): - def wrapper(*args, **kw): - d = defer.Deferred() - d.addCallback(lambda _: self._stored[method](*args, **kw)) - self._waiting.append(d) - return d - return wrapper - - for method in methods: - self._stored[method] = getattr(self, method) - setattr(self, method, _makeWrapper(method)) - - self._deferred_init.addCallback(_restore) + self._loop = LoopingCall(self._decrypt_and_recurse) def start(self, docs_to_process): """ @@ -466,13 +355,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type docs_to_process: int """ SyncEncryptDecryptPool.start(self) + self._decrypted_docs_indexes = set() + self._sync_id = uuid4().hex self._docs_to_process = docs_to_process self._deferred = defer.Deferred() - reactor.callWhenRunning(self._launch_decrypt_and_recurse) + d = self._init_db() + d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD)) + return d + + def stop(self): + if self._loop.running: + self._loop.stop() + self._finish() + SyncEncryptDecryptPool.stop(self) + + def _init_db(self): + """ + Ensure sync_id column is present then + Empty the received docs table of the sync database. - def _launch_decrypt_and_recurse(self): - d = self._decrypt_and_recurse() - d.addErrback(self._errback) + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % + self.TABLE_NAME) + d = self._runQuery(ensure_sync_id_column) + + def empty_received_docs(_): + query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) + return self._runOperation(query, (self._sync_id,)) + + d.addCallbacks(empty_received_docs, empty_received_docs) + return d def _errback(self, failure): log.err(failure) @@ -491,8 +406,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): def insert_encrypted_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): """ - Insert a received message with encrypted content, to be decrypted later - on. + Decrypt and insert a received document into local staging area to be + processed later on. :param doc_id: The document ID. :type doc_id: str @@ -507,15 +422,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :param idx: The index of this document in the current sync process. :type idx: int - :return: A deferred that will fire when the operation in the database - has finished. + :return: A deferred that will fire after the decrypted document has + been inserted in the sync db. :rtype: twisted.internet.defer.Deferred """ - docstr = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - return self._runOperation( - query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + soledad_assert(self._crypto is not None, "need a crypto object") + + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx + # decrypt asynchronously + # TODO use dedicated threadpool / move to ampoule + d = threads.deferToThread( + decrypt_doc_task, *args) + # callback will insert it for later processing + d.addCallback(self._decrypt_doc_cb) + return d def insert_received_doc( self, doc_id, doc_rev, content, gen, trans_id, idx): @@ -543,56 +465,29 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + d = self._runOperation( + query, (doc_id, doc_rev, content, gen, trans_id, 0, + idx, self._sync_id)) + d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) + return d - def _delete_received_doc(self, doc_id): + def _delete_received_docs(self, doc_ids): """ - Delete a received doc after it was inserted into the local db. + Delete a list of received docs after get them inserted into the db. - :param doc_id: Document ID. - :type doc_id: str + :param doc_id: Document ID list. + :type doc_id: list :return: A deferred that will fire when the operation in the database has finished. :rtype: twisted.internet.defer.Deferred """ - query = "DELETE FROM '%s' WHERE doc_id=?" \ - % self.TABLE_NAME - return self._runOperation(query, (doc_id,)) - - def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx): - """ - Dispatch an asynchronous document decrypting routine and save the - result object. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - content = json.loads(content) - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - self._async_results.append( - self._pool.apply_async( - decrypt_doc_task, args)) + placeholders = ', '.join('?' for _ in doc_ids) + query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ + % (self.TABLE_NAME, placeholders) + return self._runOperation(query, (doc_ids)) def _decrypt_doc_cb(self, result): """ @@ -610,11 +505,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id, idx = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self._decrypting_docs.remove((doc_id, rev)) return self.insert_received_doc( doc_id, rev, content, gen, trans_id, idx) - def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + def _get_docs(self, encrypted=None, sequence=None): """ Get documents from the received docs table in the sync db. @@ -622,9 +516,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): field equal to given parameter. :type encrypted: bool or None :param order_by: The name of the field to order results. - :type order_by: str - :param order: Whether the order should be ASC or DESC. - :type order: str :return: A deferred that will fire with the results of the database query. @@ -632,10 +523,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ "idx FROM %s" % self.TABLE_NAME - if encrypted is not None: - query += " WHERE encrypted = %d" % int(encrypted) - query += " ORDER BY %s %s" % (order_by, order) - return self._runQuery(query) + parameters = [] + if encrypted or sequence: + query += " WHERE sync_id = ? and" + parameters += [self._sync_id] + if encrypted: + query += " encrypted = ?" + parameters += [int(encrypted)] + if sequence: + query += " idx in (" + ', '.join('?' * len(sequence)) + ")" + parameters += [int(i) for i in sequence] + query += " ORDER BY idx ASC" + return self._runQuery(query, parameters) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -646,35 +545,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): documents. :rtype: twisted.internet.defer.Deferred """ - # here, we fetch the list of decrypted documents and compare with the - # index of the last succesfully processed document. - decrypted_docs = yield self._get_docs(encrypted=False) - insertable = [] - last_idx = self._last_inserted_idx - for doc_id, rev, content, gen, trans_id, encrypted, idx in \ - decrypted_docs: - if (idx != last_idx + 1): - break - insertable.append((doc_id, rev, content, gen, trans_id, idx)) - last_idx += 1 - defer.returnValue(insertable) - - @defer.inlineCallbacks - def _async_decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - - :return: A deferred that will fire after all documents have been - decrypted and inserted back in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - docs = yield self._get_docs(encrypted=True) - for doc_id, rev, content, gen, trans_id, _, idx in docs: - if (doc_id, rev) not in self._decrypting_docs: - self._decrypting_docs.append((doc_id, rev)) - self._async_decrypt_doc( - doc_id, rev, content, gen, trans_id, idx) + # Here, check in memory what are the insertable indexes that can + # form a sequence starting from the last inserted index + sequence = [] + insertable_docs = [] + next_index = self._last_inserted_idx + 1 + while next_index in self._decrypted_docs_indexes: + sequence.append(str(next_index)) + next_index += 1 + # Then fetch all the ones ready for insertion. + if sequence: + insertable_docs = yield self._get_docs(encrypted=False, + sequence=sequence) + defer.returnValue(insertable_docs) @defer.inlineCallbacks def _process_decrypted_docs(self): @@ -687,36 +570,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ insertable = yield self._get_insertable_docs() + processed_docs_ids = [] for doc_fields in insertable: method = self._insert_decrypted_local_doc # FIXME: This is used only because SQLCipherU1DBSync is synchronous # When adbapi is used there is no need for an external thread # Without this the reactor can freeze and fail docs download yield threads.deferToThread(method, *doc_fields) - defer.returnValue(insertable) - - def _delete_processed_docs(self, inserted): - """ - Delete from the sync db documents that have been processed. - - :param inserted: List of documents inserted in the previous process - step. - :type inserted: list - - :return: A list of deferreds that will fire when each operation in the - database has finished. - :rtype: twisted.internet.defer.DeferredList - """ - deferreds = [] - for doc_id, doc_rev, _, _, _, _ in inserted: - deferreds.append( - self._delete_received_doc(doc_id)) - if not deferreds: - return defer.succeed(None) - return defer.gatherResults(deferreds) + processed_docs_ids.append(doc_fields[0]) + yield self._delete_received_docs(processed_docs_ids) def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, idx): + gen, trans_id, encrypted, idx): """ Insert the decrypted document into the local replica. @@ -751,32 +616,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self._last_inserted_idx = idx self._processed_docs += 1 - def _init_db(self): - """ - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._runOperation(query) - - @defer.inlineCallbacks - def _collect_async_decryption_results(self): - """ - Collect the results of the asynchronous doc decryptions and re-raise - any exception raised by a multiprocessing async decryption call. - - :raise Exception: Raised if an async call has raised an exception. - """ - async_results = self._async_results[:] - for res in async_results: - if res.ready(): - # XXX: might raise an exception! - yield self._decrypt_doc_cb(res.get()) - self._async_results.remove(res) - @defer.inlineCallbacks def _decrypt_and_recurse(self): """ @@ -792,22 +631,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): delete operations have been executed. :rtype: twisted.internet.defer.Deferred """ + if not self.running: + defer.returnValue(None) processed = self._processed_docs pending = self._docs_to_process if processed < pending: - yield self._async_decrypt_received_docs() - yield self._collect_async_decryption_results() - docs = yield self._process_decrypted_docs() - yield self._delete_processed_docs(docs) - # recurse - self._delayed_call = reactor.callLater( - self.DECRYPT_LOOP_PERIOD, - self._launch_decrypt_and_recurse) + yield self._process_decrypted_docs() else: self._finish() def _finish(self): self._processed_docs = 0 self._last_inserted_idx = 0 - self._deferred.callback(None) + self._decrypted_docs_indexes = set() + if not self._deferred.called: + self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py index 08775580..4fc91d9d 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py @@ -24,9 +24,9 @@ import hashlib import os import sys -import u1db from twisted.internet import defer, reactor +from leap.soledad.common import l2db from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions @@ -135,7 +135,7 @@ def countDocs(_): def printResult(r, **kwargs): if kwargs: debug(*kwargs.values()) - elif isinstance(r, u1db.Document): + elif isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py index 9deba136..38ea18a3 100644 --- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py +++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py @@ -24,11 +24,11 @@ import hashlib import os import sys -import u1db from twisted.internet import defer, reactor from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db folder = os.environ.get("TMPDIR", "tmp") @@ -135,7 +135,7 @@ def countDocs(_): def printResult(r, **kwargs): if kwargs: debug(*kwargs.values()) - elif isinstance(r, u1db.Document): + elif isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py index d7bd21f2..a2683836 100644 --- a/client/src/leap/soledad/client/examples/use_adbapi.py +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -21,11 +21,11 @@ from __future__ import print_function import datetime import os -import u1db from twisted.internet import defer, reactor from leap.soledad.client import adbapi from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.common import l2db folder = os.environ.get("TMPDIR", "tmp") @@ -68,7 +68,7 @@ def countDocs(_): def printResult(r): - if isinstance(r, u1db.Document): + if isinstance(r, l2db.Document): debug(r.doc_id, r.content['number']) else: len_results = len(r[1]) diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index a16531ef..b7e54aa4 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -22,6 +22,7 @@ after receiving. """ +import os import logging from leap.common.http import HTTPClient @@ -33,6 +34,12 @@ from leap.soledad.client.http_target.fetch import HTTPDocFetcher logger = logging.getLogger(__name__) +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): """ @@ -93,3 +100,6 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): # the duplicated syncing bug. This could be reduced to the 30s default # after implementing Cancellable Sync. See #7382 self._http = HTTPClient(cert_file, timeout=90) + + if DO_STATS: + self.sync_exchange_phase = [0] diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 94354092..f8de9a15 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -14,17 +14,25 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import time import json import base64 from uuid import uuid4 -from u1db import SyncTarget from twisted.web.error import Error from twisted.internet import defer -from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.http_target.support import readBody +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.common.l2db import SyncTarget + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True class SyncTargetAPI(SyncTarget): @@ -187,6 +195,10 @@ class SyncTargetAPI(SyncTarget): transaction id of the target replica. :rtype: twisted.internet.defer.Deferred """ + # ---------- phase 1: send docs to server ---------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- self._ensure_callback = ensure_callback @@ -203,6 +215,11 @@ class SyncTargetAPI(SyncTarget): last_known_trans_id, sync_id) + # ---------- phase 2: receive docs ----------------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, ensure_callback, sync_id, @@ -214,6 +231,11 @@ class SyncTargetAPI(SyncTarget): cur_target_gen = gen_after_send cur_target_trans_id = trans_id_after_send + # ---------- phase 3: sync exchange is over -------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + defer.returnValue([cur_target_gen, cur_target_trans_id]) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 9f7a4193..a3f70b02 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -16,15 +16,17 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import logging import json -from u1db import errors -from u1db.remote import utils + from twisted.internet import defer -from leap.soledad.common.document import SoledadDocument + from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils logger = logging.getLogger(__name__) @@ -81,9 +83,6 @@ class HTTPDocFetcher(object): new_generation = ngen new_transaction_id = ntrans - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - # --------------------------------------------------------------------- # maybe receive the rest of the documents # --------------------------------------------------------------------- @@ -151,6 +150,10 @@ class HTTPDocFetcher(object): new_generation, new_transaction_id, number_of_changes, doc_id, \ rev, content, gen, trans_id = \ self._parse_received_doc_response(response) + + if self._sync_decr_pool and not self._sync_decr_pool.running: + self._sync_decr_pool.start(number_of_changes) + if doc_id is not None: # decrypt incoming document and insert into local database # ------------------------------------------------------------- diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 89288779..13218acf 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -16,10 +16,13 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import json import logging + from twisted.internet import defer + from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody + logger = logging.getLogger(__name__) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 2625744c..6ec98ed4 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -16,20 +16,22 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import warnings import json -from u1db import errors -from u1db.remote import http_errors + from twisted.internet import defer from twisted.web.client import _ReadBodyProtocol from twisted.web.client import PartialDownloadError from twisted.web._newclient import ResponseDone from twisted.web._newclient import PotentialDataLoss +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors # we want to make sure that HTTP errors will raise appropriate u1db errors, # that is, fire errbacks with the appropriate failures, in the context of # twisted. Because of that, we redefine the http body reader used by the HTTP # client below. + class ReadBodyProtocol(_ReadBodyProtocol): """ From original Twisted implementation, focused on adding our error diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index e2a5a1d7..3547a711 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -33,7 +33,6 @@ from hashlib import sha256 from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import document -from leap.soledad.common import errors from leap.soledad.client import events from leap.soledad.client.crypto import encrypt_sym, decrypt_sym @@ -81,6 +80,7 @@ class BootstrapSequenceError(SecretsException): # Secrets handler # + class SoledadSecrets(object): """ @@ -143,6 +143,8 @@ class SoledadSecrets(object): KDF_LENGTH_KEY = 'kdf_length' KDF_SCRYPT = 'scrypt' CIPHER_AES256 = 'aes256' + RECOVERY_DOC_VERSION_KEY = 'version' + RECOVERY_DOC_VERSION = 1 """ Keys used to access storage secrets in recovery documents. """ @@ -162,17 +164,12 @@ class SoledadSecrets(object): :param shared_db: The shared database that stores user secrets. :type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase """ - # XXX removed since not in use - # We will pick the first secret available. - # param secret_id: The id of the storage secret to be used. - self._uuid = uuid self._userid = userid self._passphrase = passphrase self._secrets_path = secrets_path self._shared_db = shared_db self._secrets = {} - self._secret_id = None def bootstrap(self): @@ -195,49 +192,44 @@ class SoledadSecrets(object): storage on server sequence has failed for some reason. """ # STAGE 1 - verify if secrets exist locally - if not self._has_secret(): # try to load from local storage. - - # STAGE 2 - there are no secrets in local storage, so try to fetch - # encrypted secrets from server. - logger.info( - 'Trying to fetch cryptographic secrets from shared recovery ' - 'database...') - - # --- start of atomic operation in shared db --- + try: + logger.info("Trying to load secrets from local storage...") + version = self._load_secrets_from_local_file() + # eventually migrate local and remote stored documents from old + # format version + if version < self.RECOVERY_DOC_VERSION: + self._store_secrets() + self._upload_crypto_secrets() + logger.info("Found secrets in local storage.") + return - # obtain lock on shared db - token = timeout = None - try: - token, timeout = self._shared_db.lock() - except errors.AlreadyLockedError: - raise BootstrapSequenceError('Database is already locked.') - except errors.LockTimedOutError: - raise BootstrapSequenceError('Lock operation timed out.') + except NoStorageSecret: + logger.info("Could not find secrets in local storage.") - self._get_or_gen_crypto_secrets() + # STAGE 2 - there are no secrets in local storage and this is the + # first time we are running soledad with the specified + # secrets_path. Try to fetch encrypted secrets from + # server. + try: + logger.info('Trying to fetch secrets from remote storage...') + version = self._download_crypto_secrets() + self._store_secrets() + # eventually migrate remote stored document from old format + # version + if version < self.RECOVERY_DOC_VERSION: + self._upload_crypto_secrets() + logger.info('Found secrets in remote storage.') + return + except NoStorageSecret: + logger.info("Could not find secrets in remote storage.") - # release the lock on shared db - try: - self._shared_db.unlock(token) - self._shared_db.close() - except errors.NotLockedError: - # for some reason the lock expired. Despite that, secret - # loading or generation/storage must have been executed - # successfully, so we pass. - pass - except errors.InvalidTokenError: - # here, our lock has not only expired but also some other - # client application has obtained a new lock and is currently - # doing its thing in the shared database. Using the same - # reasoning as above, we assume everything went smooth and - # pass. - pass - except Exception as e: - logger.error("Unhandled exception when unlocking shared " - "database.") - logger.exception(e) - - # --- end of atomic operation in shared db --- + # STAGE 3 - there are no secrets in server also, so we want to + # generate the secrets and store them in the remote + # db. + logger.info("Generating secrets...") + self._gen_crypto_secrets() + logger.info("Uploading secrets...") + self._upload_crypto_secrets() def _has_secret(self): """ @@ -246,21 +238,7 @@ class SoledadSecrets(object): :return: Whether there's a storage secret for symmetric encryption. :rtype: bool """ - logger.info("Checking if there's a secret in local storage...") - if (self._secret_id is None or self._secret_id not in self._secrets) \ - and os.path.isfile(self._secrets_path): - try: - self._load_secrets() # try to load from disk - except IOError as e: - logger.warning( - 'IOError while loading secrets from disk: %s' % str(e)) - - if self.storage_secret is not None: - logger.info("Found a secret in local storage.") - return True - - logger.info("Could not find a secret in local storage.") - return False + return self.storage_secret is not None def _maybe_set_active_secret(self, active_secret): """ @@ -272,73 +250,82 @@ class SoledadSecrets(object): active_secret = self._secrets.items()[0][0] self.set_secret_id(active_secret) - def _load_secrets(self): + def _load_secrets_from_local_file(self): """ Load storage secrets from local file. + + :return version: The version of the locally stored recovery document. + + :raise NoStorageSecret: Raised if there are no secrets available in + local storage. """ + # check if secrets file exists and we can read it + if not os.path.isfile(self._secrets_path): + raise NoStorageSecret + # read storage secrets from file content = None with open(self._secrets_path, 'r') as f: content = json.loads(f.read()) - _, active_secret = self._import_recovery_document(content) + _, active_secret, version = self._import_recovery_document(content) + self._maybe_set_active_secret(active_secret) - # enlarge secret if needed - enlarged = False - if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH: - gen_len = self.GEN_SECRET_LENGTH \ - - len(self._secrets[self._secret_id]) - new_piece = os.urandom(gen_len) - self._secrets[self._secret_id] += new_piece - enlarged = True - # store and save in shared db if needed - if enlarged: - self._store_secrets() - self._put_secrets_in_shared_db() - def _get_or_gen_crypto_secrets(self): + return version + + def _download_crypto_secrets(self): + """ + Download crypto secrets. + + :return version: The version of the remotelly stored recovery document. + + :raise NoStorageSecret: Raised if there are no secrets available in + remote storage. + """ + doc = None + if self._shared_db.syncable: + doc = self._get_secrets_from_shared_db() + + if doc is None: + raise NoStorageSecret + + _, active_secret, version = self._import_recovery_document(doc.content) + self._maybe_set_active_secret(active_secret) + + return version + + def _gen_crypto_secrets(self): + """ + Generate the crypto secrets. + """ + logger.info('No cryptographic secrets found, creating new secrets...') + secret_id = self._gen_secret() + self.set_secret_id(secret_id) + + def _upload_crypto_secrets(self): """ - Retrieves or generates the crypto secrets. + Send crypto secrets to shared db. :raises BootstrapSequenceError: Raised when unable to store secrets in shared database. """ if self._shared_db.syncable: - doc = self._get_secrets_from_shared_db() - else: - doc = None - - if doc is not None: - logger.info( - 'Found cryptographic secrets in shared recovery ' - 'database.') - _, active_secret = self._import_recovery_document(doc.content) - self._maybe_set_active_secret(active_secret) - self._store_secrets() # save new secrets in local file - else: - # STAGE 3 - there are no secrets in server also, so - # generate a secret and store it in remote db. - logger.info( - 'No cryptographic secrets found, creating new ' - ' secrets...') - self.set_secret_id(self._gen_secret()) - - if self._shared_db.syncable: + try: + self._put_secrets_in_shared_db() + except Exception as ex: + # storing generated secret in shared db failed for + # some reason, so we erase the generated secret and + # raise. try: - self._put_secrets_in_shared_db() - except Exception as ex: - # storing generated secret in shared db failed for - # some reason, so we erase the generated secret and - # raise. - try: - os.unlink(self._secrets_path) - except OSError as e: - if e.errno != errno.ENOENT: - # no such file or directory - logger.exception(e) - logger.exception(ex) - raise BootstrapSequenceError( - 'Could not store generated secret in the shared ' - 'database, bailing out...') + os.unlink(self._secrets_path) + except OSError as e: + if e.errno != errno.ENOENT: + # no such file or directory + logger.exception(e) + logger.exception(ex) + raise BootstrapSequenceError( + 'Could not store generated secret in the shared ' + 'database, bailing out...') # # Shared DB related methods @@ -360,7 +347,7 @@ class SoledadSecrets(object): """ Export the storage secrets. - A recovery document has the following structure: + Current format of recovery document has the following structure: { 'storage_secrets': { @@ -371,6 +358,7 @@ class SoledadSecrets(object): }, }, 'active_secret': '<secret_id>', + 'version': '<recovery document format version>', } Note that multiple storage secrets might be stored in one recovery @@ -388,13 +376,14 @@ class SoledadSecrets(object): data = { self.STORAGE_SECRETS_KEY: encrypted_secrets, self.ACTIVE_SECRET_KEY: self._secret_id, + self.RECOVERY_DOC_VERSION_KEY: self.RECOVERY_DOC_VERSION, } return data def _import_recovery_document(self, data): """ - Import storage secrets for symmetric encryption and uuid (if present) - from a recovery document. + Import storage secrets for symmetric encryption from a recovery + document. Note that this method does not store the imported data on disk. For that, use C{self._store_secrets()}. @@ -402,11 +391,44 @@ class SoledadSecrets(object): :param data: The recovery document. :type data: dict - :return: A tuple containing the number of imported secrets and the - secret_id of the last active secret. - :rtype: (int, str) + :return: A tuple containing the number of imported secrets, the + secret_id of the last active secret, and the recovery + document format version. + :rtype: (int, str, int) """ soledad_assert(self.STORAGE_SECRETS_KEY in data) + version = data.get(self.RECOVERY_DOC_VERSION_KEY, 1) + meth = getattr(self, '_import_recovery_document_version_%d' % version) + secret_count, active_secret = meth(data) + return secret_count, active_secret, version + + def _import_recovery_document_version_1(self, data): + """ + Import storage secrets for symmetric encryption from a recovery + document with format version 1. + + Version 1 of recovery document has the following structure: + + { + 'storage_secrets': { + '<storage_secret id>': { + 'cipher': 'aes256', + 'length': <secret length>, + 'secret': '<encrypted storage_secret>', + }, + }, + 'active_secret': '<secret_id>', + 'version': '<recovery document format version>', + } + + :param data: The recovery document. + :type data: dict + + :return: A tuple containing the number of imported secrets, the + secret_id of the last active secret, and the recovery + document format version. + :rtype: (int, str, int) + """ # include secrets in the secret pool. secret_count = 0 secrets = data[self.STORAGE_SECRETS_KEY].items() @@ -419,7 +441,8 @@ class SoledadSecrets(object): if secret_id not in self._secrets: try: self._secrets[secret_id] = \ - self._decrypt_storage_secret(encrypted_secret) + self._decrypt_storage_secret_version_1( + encrypted_secret) secret_count += 1 except SecretsException as e: logger.error("Failed to decrypt storage secret: %s" @@ -478,13 +501,21 @@ class SoledadSecrets(object): # Management of secret for symmetric encryption. # - def _decrypt_storage_secret(self, encrypted_secret_dict): + def _decrypt_storage_secret_version_1(self, encrypted_secret_dict): """ Decrypt the storage secret. Storage secret is encrypted before being stored. This method decrypts and returns the decrypted storage secret. + Version 1 of storage secret format has the following structure: + + '<storage_secret id>': { + 'cipher': 'aes256', + 'length': <secret length>, + 'secret': '<encrypted storage_secret>', + }, + :param encrypted_secret_dict: The encrypted storage secret. :type encrypted_secret_dict: dict diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py index 6abf8ea3..d43db045 100644 --- a/client/src/leap/soledad/client/shared_db.py +++ b/client/src/leap/soledad/client/shared_db.py @@ -17,7 +17,7 @@ """ A shared database for storing/retrieving encrypted key material. """ -from u1db.remote import http_database +from leap.soledad.common.l2db.remote import http_database from leap.soledad.client.auth import TokenBasedAuth @@ -151,33 +151,3 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth): http_database.HTTPDatabase.__init__(self, url, document_factory, creds) self._uuid = uuid - - def lock(self): - """ - Obtain a lock on document with id C{doc_id}. - - :return: A tuple containing the token to unlock and the timeout until - lock expiration. - :rtype: (str, float) - - :raise HTTPError: Raised if any HTTP error occurs. - """ - if self.syncable: - res, headers = self._request_json( - 'PUT', ['lock', self._uuid], body={}) - return res['token'], res['timeout'] - else: - return None, None - - def unlock(self, token): - """ - Release the lock on shared database. - - :param token: The token returned by a previous call to lock(). - :type token: str - - :raise HTTPError: - """ - if self.syncable: - _, _ = self._request_json( - 'DELETE', ['lock', self._uuid], params={'token': token}) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 22ddc87d..166c0783 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -44,10 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0. import logging import os import json -import u1db - -from u1db import errors as u1db_errors -from u1db.backends import sqlite_backend from hashlib import sha256 from functools import partial @@ -58,11 +54,15 @@ from twisted.internet import reactor from twisted.internet import defer from twisted.enterprise import adbapi +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import l2db +from leap.soledad.common.l2db import errors as u1db_errors +from leap.soledad.common.l2db.backends import sqlite_backend +from leap.soledad.common.errors import DatabaseAccessError + from leap.soledad.client.http_target import SoledadHTTPSyncTarget from leap.soledad.client.sync import SoledadSynchronizer - from leap.soledad.client import pragmas -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -72,6 +72,12 @@ logger = logging.getLogger(__name__) sqlite_backend.dbapi2 = sqlcipher_dbapi2 +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): """ Initialize a SQLCipher database. @@ -278,7 +284,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) if self.defer_encryption: # TODO move to api? - self._sync_enc_pool.enqueue_doc_for_encryption(doc) + self._sync_enc_pool.encrypt_doc(doc) return doc_rev # @@ -437,21 +443,21 @@ class SQLCipherU1DBSync(SQLCipherDatabase): # format is the following: # # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} - self._syncers = {} - - # Storage for the documents received during a sync + # storage for the documents received during a sync self.received_docs = [] self.running = False + self.shutdownID = None + self._db_handle = None + # initialize the main db before scheduling a start + self._initialize_main_db() self._reactor = reactor self._reactor.callWhenRunning(self._start) - self._db_handle = None - self._initialize_main_db() - - self.shutdownID = None + if DO_STATS: + self.sync_phase = None @property def _replica_uid(self): @@ -464,11 +470,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.running = True def _initialize_main_db(self): - self._db_handle = initialize_sqlcipher_db( - self._opts, check_same_thread=False) - self._real_replica_uid = None - self._ensure_schema() - self.set_document_factory(soledad_doc_factory) + try: + self._db_handle = initialize_sqlcipher_db( + self._opts, check_same_thread=False) + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + except sqlcipher_dbapi2.DatabaseError as e: + raise DatabaseAccessError(str(e)) @defer.inlineCallbacks def sync(self, url, creds=None, defer_decryption=True): @@ -497,6 +506,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: Deferred """ syncer = self._get_syncer(url, creds=creds) + if DO_STATS: + self.sync_phase = syncer.sync_phase + self.syncer = syncer + self.sync_exchange_phase = syncer.sync_exchange_phase local_gen_before_sync = yield syncer.sync( defer_decryption=defer_decryption) self.received_docs = syncer.received_docs @@ -582,7 +595,7 @@ class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): self._db_handle = conn self._real_replica_uid = None self._ensure_schema() - self._factory = u1db.Document + self._factory = l2db.Document class SoledadSQLCipherWrapper(SQLCipherDatabase): diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 1879031f..2656a150 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,18 +17,26 @@ """ Soledad synchronization utilities. """ +import os +import time import logging from twisted.internet import defer -from u1db import errors +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.sync import Synchronizer from leap.soledad.common.errors import BackendNotReadyError -from u1db.sync import Synchronizer logger = logging.getLogger(__name__) +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + class SoledadSynchronizer(Synchronizer): """ Collect the state around synchronizing 2 U1DB replicas. @@ -42,6 +50,12 @@ class SoledadSynchronizer(Synchronizer): """ received_docs = [] + def __init__(self, *args, **kwargs): + Synchronizer.__init__(self, *args, **kwargs) + if DO_STATS: + self.sync_phase = [0] + self.sync_exchange_phase = None + @defer.inlineCallbacks def sync(self, defer_decryption=True): """ @@ -64,9 +78,16 @@ class SoledadSynchronizer(Synchronizer): the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ + sync_target = self.sync_target self.received_docs = [] + # ---------- phase 1: get sync info from server ---------------------- + if DO_STATS: + self.sync_phase[0] += 1 + self.sync_exchange_phase = self.sync_target.sync_exchange_phase + # -------------------------------------------------------------------- + # get target identifier, its current generation, # and its last-seen database generation for this source ensure_callback = None @@ -106,6 +127,11 @@ class SoledadSynchronizer(Synchronizer): self.source.validate_gen_and_trans_id( target_my_gen, target_my_trans_id) + # ---------- phase 2: what's changed --------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + # what's changed since that generation and this current gen my_gen, _, changes = self.source.whats_changed(target_my_gen) logger.debug("Soledad sync: there are %d documents to send." @@ -130,6 +156,11 @@ class SoledadSynchronizer(Synchronizer): raise errors.InvalidTransactionId defer.returnValue(my_gen) + # ---------- phase 3: sync exchange ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + # prepare to send all the changed docs changed_doc_ids = [doc_id for doc_id, _, _ in changes] docs_to_send = self.source.get_docs( @@ -162,6 +193,12 @@ class SoledadSynchronizer(Synchronizer): "my_gen": my_gen } self._syncing_info = info + + # ---------- phase 4: complete sync ---------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + yield self.complete_sync() _, _, changes = self.source.whats_changed(target_my_gen) @@ -170,6 +207,11 @@ class SoledadSynchronizer(Synchronizer): just_received = list(set(changed_doc_ids) - set(ids_sent)) self.received_docs = just_received + # ---------- phase 5: sync is over ----------------------------------- + if DO_STATS: + self.sync_phase[0] += 1 + # -------------------------------------------------------------------- + defer.returnValue(my_gen) def complete_sync(self): |