summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/adbapi.py53
-rw-r--r--client/src/leap/soledad/client/api.py96
-rw-r--r--client/src/leap/soledad/client/auth.py2
-rw-r--r--client/src/leap/soledad/client/crypto.py11
-rw-r--r--client/src/leap/soledad/client/encdecpool.py384
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py4
-rw-r--r--client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py4
-rw-r--r--client/src/leap/soledad/client/examples/use_adbapi.py4
-rw-r--r--client/src/leap/soledad/client/http_target/__init__.py10
-rw-r--r--client/src/leap/soledad/client/http_target/api.py26
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py15
-rw-r--r--client/src/leap/soledad/client/http_target/send.py3
-rw-r--r--client/src/leap/soledad/client/http_target/support.py6
-rw-r--r--client/src/leap/soledad/client/secrets.py271
-rw-r--r--client/src/leap/soledad/client/shared_db.py32
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py53
-rw-r--r--client/src/leap/soledad/client/sync.py46
17 files changed, 474 insertions, 546 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 77822247..ef0f9066 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -24,13 +24,12 @@ import sys
import logging
from functools import partial
-from threading import BoundedSemaphore
from twisted.enterprise import adbapi
+from twisted.internet.defer import DeferredSemaphore
from twisted.python import log
from zope.proxy import ProxyBase, setProxiedObject
-from pysqlcipher.dbapi2 import OperationalError
-from pysqlcipher.dbapi2 import DatabaseError
+from pysqlcipher import dbapi2
from leap.soledad.common.errors import DatabaseAccessError
@@ -49,7 +48,7 @@ if DEBUG_SQL:
How long the SQLCipher connection should wait for the lock to go away until
raising an exception.
"""
-SQLCIPHER_CONNECTION_TIMEOUT = 10
+SQLCIPHER_CONNECTION_TIMEOUT = 5
"""
How many times a SQLCipher query should be retried in case of timeout.
@@ -79,9 +78,11 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher",
if openfun is None and driver == "pysqlcipher":
openfun = partial(set_init_pragmas, opts=opts)
return U1DBConnectionPool(
- "%s.dbapi2" % driver, opts=opts, sync_enc_pool=sync_enc_pool,
- database=opts.path, check_same_thread=False, cp_openfun=openfun,
- timeout=SQLCIPHER_CONNECTION_TIMEOUT)
+ opts, sync_enc_pool,
+ # the following params are relayed "as is" to twisted's
+ # ConnectionPool.
+ "%s.dbapi2" % driver, opts.path, timeout=SQLCIPHER_CONNECTION_TIMEOUT,
+ check_same_thread=False, cp_openfun=openfun)
class U1DBConnection(adbapi.Connection):
@@ -105,8 +106,10 @@ class U1DBConnection(adbapi.Connection):
self._sync_enc_pool = sync_enc_pool
try:
adbapi.Connection.__init__(self, pool)
- except DatabaseError:
- raise DatabaseAccessError('Could not open sqlcipher database')
+ except dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(
+ 'Error initializing connection to sqlcipher database: %s'
+ % str(e))
def reconnect(self):
"""
@@ -165,17 +168,17 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
connectionFactory = U1DBConnection
transactionFactory = U1DBTransaction
- def __init__(self, *args, **kwargs):
+ def __init__(self, opts, sync_enc_pool, *args, **kwargs):
"""
Initialize the connection pool.
"""
- # extract soledad-specific objects from keyword arguments
- self.opts = kwargs.pop("opts")
- self._sync_enc_pool = kwargs.pop("sync_enc_pool")
+ self.opts = opts
+ self._sync_enc_pool = sync_enc_pool
try:
adbapi.ConnectionPool.__init__(self, *args, **kwargs)
- except DatabaseError:
- raise DatabaseAccessError('Could not open sqlcipher database')
+ except dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(
+ 'Error initializing u1db connection pool: %s' % str(e))
# all u1db connections, hashed by thread-id
self._u1dbconnections = {}
@@ -183,10 +186,15 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
# The replica uid, primed by the connections on init.
self.replica_uid = ProxyBase(None)
- conn = self.connectionFactory(
- self, self._sync_enc_pool, init_u1db=True)
- replica_uid = conn._u1db._real_replica_uid
- setProxiedObject(self.replica_uid, replica_uid)
+ try:
+ conn = self.connectionFactory(
+ self, self._sync_enc_pool, init_u1db=True)
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+ except DatabaseAccessError as e:
+ self.threadpool.stop()
+ raise DatabaseAccessError(
+ "Error initializing connection factory: %s" % str(e))
def runU1DBQuery(self, meth, *args, **kw):
"""
@@ -204,16 +212,17 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
:rtype: twisted.internet.defer.Deferred
"""
meth = "u1db_%s" % meth
- semaphore = BoundedSemaphore(SQLCIPHER_MAX_RETRIES - 1)
+ semaphore = DeferredSemaphore(SQLCIPHER_MAX_RETRIES)
def _run_interaction():
return self.runInteraction(
self._runU1DBQuery, meth, *args, **kw)
def _errback(failure):
- failure.trap(OperationalError)
+ failure.trap(dbapi2.OperationalError)
if failure.getErrorMessage() == "database is locked":
- should_retry = semaphore.acquire(False)
+ logger.warning("Database operation timed out.")
+ should_retry = semaphore.acquire()
if should_retry:
logger.warning(
"Database operation timed out while waiting for "
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index e657c939..1bfbed8a 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -35,16 +35,11 @@ import ssl
import uuid
import urlparse
-try:
- import cchardet as chardet
-except ImportError:
- import chardet
from itertools import chain
from StringIO import StringIO
from collections import defaultdict
-from u1db.remote import http_client
-from u1db.remote.ssl_match_hostname import match_hostname
+
from twisted.internet.defer import DeferredLock, returnValue, inlineCallbacks
from zope.interface import implements
@@ -54,6 +49,9 @@ from leap.common.plugins import collect_plugins
from leap.soledad.common import SHARED_DB_NAME
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.l2db.remote import http_client
+from leap.soledad.common.l2db.remote.ssl_match_hostname import match_hostname
+from leap.soledad.common.errors import DatabaseAccessError
from leap.soledad.client import adbapi
from leap.soledad.client import events as soledad_events
@@ -66,6 +64,13 @@ from leap.soledad.client import encdecpool
logger = logging.getLogger(name=__name__)
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
#
# Constants
#
@@ -126,8 +131,7 @@ class Soledad(object):
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file, shared_db=None,
- auth_token=None, defer_encryption=False, syncable=True,
- userid=None):
+ auth_token=None, defer_encryption=False, syncable=True):
"""
Initialize configuration, cryptographic keys and dbs.
@@ -181,7 +185,6 @@ class Soledad(object):
"""
# store config params
self._uuid = uuid
- self._userid = userid
self._passphrase = passphrase
self._local_db_path = local_db_path
self._server_url = server_url
@@ -211,10 +214,22 @@ class Soledad(object):
self._init_secrets()
self._crypto = SoledadCrypto(self._secrets.remote_storage_secret)
- self._init_u1db_sqlcipher_backend()
- if syncable:
- self._init_u1db_syncer()
+ try:
+ # initialize database access, trap any problems so we can shutdown
+ # smoothly.
+ self._init_u1db_sqlcipher_backend()
+ if syncable:
+ self._init_u1db_syncer()
+ except DatabaseAccessError:
+ # oops! something went wrong with backend initialization. We
+ # have to close any thread-related stuff we have already opened
+ # here, otherwise there might be zombie threads that may clog the
+ # reactor.
+ self._sync_db.close()
+ if hasattr(self, '_dbpool'):
+ self._dbpool.close()
+ raise
#
# initialization/destruction methods
@@ -255,7 +270,7 @@ class Soledad(object):
"""
self._secrets = SoledadSecrets(
self.uuid, self._passphrase, self._secrets_path,
- self.shared_db, userid=self._userid)
+ self.shared_db, userid=self.userid)
self._secrets.bootstrap()
def _init_u1db_sqlcipher_backend(self):
@@ -303,6 +318,17 @@ class Soledad(object):
sync_db=self._sync_db,
sync_enc_pool=self._sync_enc_pool)
+ def sync_stats(self):
+ sync_phase = 0
+ if getattr(self._dbsyncer, 'sync_phase', None):
+ sync_phase = self._dbsyncer.sync_phase[0]
+ sync_exchange_phase = 0
+ if getattr(self._dbsyncer, 'syncer', None):
+ if getattr(self._dbsyncer.syncer, 'sync_exchange_phase', None):
+ _p = self._dbsyncer.syncer.sync_exchange_phase[0]
+ sync_exchange_phase = _p
+ return sync_phase, sync_exchange_phase
+
#
# Closing methods
#
@@ -359,7 +385,6 @@ class Soledad(object):
also be updated.
:rtype: twisted.internet.defer.Deferred
"""
- doc.content = _convert_to_unicode(doc.content)
return self._defer("put_doc", doc)
def delete_doc(self, doc):
@@ -454,8 +479,7 @@ class Soledad(object):
# create_doc (and probably to put_doc too). There are cases (mail
# payloads for example) in which we already have the encoding in the
# headers, so we don't need to guess it.
- return self._defer(
- "create_doc", _convert_to_unicode(content), doc_id=doc_id)
+ return self._defer("create_doc", content, doc_id=doc_id)
def create_doc_from_json(self, json, doc_id=None):
"""
@@ -655,7 +679,7 @@ class Soledad(object):
@property
def userid(self):
- return self._userid
+ return self.uuid
#
# ISyncableStorage
@@ -976,44 +1000,6 @@ class Soledad(object):
return self.create_doc(doc)
-def _convert_to_unicode(content):
- """
- Convert content to unicode (or all the strings in content).
-
- NOTE: Even though this method supports any type, it will
- currently ignore contents of lists, tuple or any other
- iterable than dict. We don't need support for these at the
- moment
-
- :param content: content to convert
- :type content: object
-
- :rtype: object
- """
- # Chardet doesn't guess very well with some smallish payloads.
- # This parameter might need some empirical tweaking.
- CUTOFF_CONFIDENCE = 0.90
-
- if isinstance(content, unicode):
- return content
- elif isinstance(content, str):
- encoding = "utf-8"
- result = chardet.detect(content)
- if result["confidence"] > CUTOFF_CONFIDENCE:
- encoding = result["encoding"]
- try:
- content = content.decode(encoding)
- except UnicodeError as e:
- logger.error("Unicode error: {0!r}. Using 'replace'".format(e))
- content = content.decode(encoding, 'replace')
- return content
- else:
- if isinstance(content, dict):
- for key in content.keys():
- content[key] = _convert_to_unicode(content[key])
- return content
-
-
def create_path_if_not_exists(path):
try:
if not os.path.isdir(path):
diff --git a/client/src/leap/soledad/client/auth.py b/client/src/leap/soledad/client/auth.py
index 6dfabeb4..78e9bf1b 100644
--- a/client/src/leap/soledad/client/auth.py
+++ b/client/src/leap/soledad/client/auth.py
@@ -22,7 +22,7 @@ they can do token-based auth requests to the Soledad server.
"""
import base64
-from u1db import errors
+from leap.soledad.common.l2db import errors
class TokenBasedAuth(object):
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 363d71b9..f7d92372 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -26,7 +26,8 @@ import logging
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends.multibackend import MultiBackend
-from cryptography.hazmat.backends.openssl.backend import Backend as OpenSSLBackend
+from cryptography.hazmat.backends.openssl.backend \
+ import Backend as OpenSSLBackend
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
@@ -38,6 +39,8 @@ logger = logging.getLogger(__name__)
MAC_KEY_LENGTH = 64
+crypto_backend = MultiBackend([OpenSSLBackend()])
+
def encrypt_sym(data, key):
"""
@@ -58,8 +61,7 @@ def encrypt_sym(data, key):
(len(key) * 8))
iv = os.urandom(16)
- backend = MultiBackend([OpenSSLBackend()])
- cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
@@ -86,9 +88,8 @@ def decrypt_sym(data, key, iv):
soledad_assert(
len(key) == 32, # 32 x 8 = 256 bits.
'Wrong key size: %s (must be 256 bits long).' % len(key))
- backend = MultiBackend([OpenSSLBackend()])
iv = binascii.a2b_base64(iv)
- cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=backend)
+ cipher = Cipher(algorithms.AES(key), modes.CTR(iv), backend=crypto_backend)
decryptor = cipher.decryptor()
return decryptor.update(data) + decryptor.finalize()
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 34667a1e..a6d49b21 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -22,12 +22,11 @@ during synchronization.
"""
-import multiprocessing
-import Queue
import json
import logging
+from uuid import uuid4
-from twisted.internet import reactor
+from twisted.internet.task import LoopingCall
from twisted.internet import threads
from twisted.internet import defer
from twisted.python import log
@@ -51,9 +50,6 @@ class SyncEncryptDecryptPool(object):
Base class for encrypter/decrypter pools.
"""
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
-
def __init__(self, crypto, sync_db):
"""
Initialize the pool of encryption-workers.
@@ -66,21 +62,14 @@ class SyncEncryptDecryptPool(object):
"""
self._crypto = crypto
self._sync_db = sync_db
- self._pool = None
self._delayed_call = None
self._started = False
def start(self):
- if self.running:
- return
- self._create_pool()
self._started = True
def stop(self):
- if not self.running:
- return
self._started = False
- self._destroy_pool()
# maybe cancel the next delayed call
if self._delayed_call \
and not self._delayed_call.called:
@@ -90,27 +79,6 @@ class SyncEncryptDecryptPool(object):
def running(self):
return self._started
- def _create_pool(self):
- self._pool = multiprocessing.Pool(self.WORKERS)
-
- def _destroy_pool(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
def _runOperation(self, query, *args):
"""
Run an operation on the sync db.
@@ -180,7 +148,6 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
Initialize the sync encrypter pool.
"""
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._encr_queue = defer.DeferredQueue()
# TODO delete already synced files from database
def start(self):
@@ -189,73 +156,33 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
SyncEncryptDecryptPool.start(self)
logger.debug("Starting the encryption loop...")
- reactor.callWhenRunning(self._maybe_encrypt_and_recurse)
def stop(self):
"""
Stop the encrypter pool.
"""
- # close the sync queue
- if self._encr_queue:
- q = self._encr_queue
- for d in q.pending:
- d.cancel()
- del q
- self._encr_queue = None
SyncEncryptDecryptPool.stop(self)
- def enqueue_doc_for_encryption(self, doc):
+ def encrypt_doc(self, doc):
"""
- Enqueue a document for encryption.
+ Encrypt document asynchronously then insert it on
+ local staging database.
:param doc: The document to be encrypted.
:type doc: SoledadDocument
"""
- try:
- self._encr_queue.put(doc)
- except Queue.Full:
- # do not asynchronously encrypt this file if the queue is full
- pass
-
- @defer.inlineCallbacks
- def _maybe_encrypt_and_recurse(self):
- """
- Process one document from the encryption queue.
-
- Asynchronously encrypt a document that will then be stored in the sync
- db. Processed documents will be read by the SoledadSyncTarget during
- the sync_exchange.
- """
- try:
- while self.running:
- doc = yield self._encr_queue.get()
- self._encrypt_doc(doc)
- except defer.QueueUnderflow:
- self._delayed_call = reactor.callLater(
- self.ENCRYPT_LOOP_PERIOD,
- self._maybe_encrypt_and_recurse)
-
- def _encrypt_doc(self, doc):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
soledad_assert(self._crypto is not None, "need a crypto object")
docstr = doc.get_json()
key = self._crypto.doc_passphrase(doc.doc_id)
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
# encrypt asynchronously
- self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self._encrypt_doc_cb)
+ # TODO use dedicated threadpool / move to ampoule
+ d = threads.deferToThread(
+ encrypt_doc_task, *args)
+ d.addCallback(self._encrypt_doc_cb)
+ return d
def _encrypt_doc_cb(self, result):
"""
@@ -336,8 +263,8 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
:type doc_id: str
:param doc_rev: The document revision.
:type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
+ :param content: The encrypted content of the document as JSON dict.
+ :type content: dict
:param gen: The generation corresponding to the modification of that
document.
:type gen: int
@@ -384,7 +311,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
TABLE_NAME = "docs_received"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
- "trans_id, encrypted, idx"
+ "trans_id, encrypted, idx, sync_id"
"""
Period of recurrence of the periodic decrypting task, in seconds.
@@ -414,46 +341,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._docs_to_process = None
self._processed_docs = 0
self._last_inserted_idx = 0
- self._decrypting_docs = []
-
- # a list that holds the asynchronous decryption results so they can be
- # collected when they are ready
- self._async_results = []
- # initialize db and make sure any database operation happens after
- # db initialization
- self._deferred_init = self._init_db()
- self._wait_init_db('_runOperation', '_runQuery')
-
- def _wait_init_db(self, *methods):
- """
- Methods that need to wait for db initialization.
-
- :param methods: methods that need to wait for initialization
- :type methods: tuple(str)
- """
- self._waiting = []
- self._stored = {}
-
- def _restore(_):
- for method in self._stored:
- setattr(self, method, self._stored[method])
- for d in self._waiting:
- d.callback(None)
-
- def _makeWrapper(method):
- def wrapper(*args, **kw):
- d = defer.Deferred()
- d.addCallback(lambda _: self._stored[method](*args, **kw))
- self._waiting.append(d)
- return d
- return wrapper
-
- for method in methods:
- self._stored[method] = getattr(self, method)
- setattr(self, method, _makeWrapper(method))
-
- self._deferred_init.addCallback(_restore)
+ self._loop = LoopingCall(self._decrypt_and_recurse)
def start(self, docs_to_process):
"""
@@ -466,13 +355,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type docs_to_process: int
"""
SyncEncryptDecryptPool.start(self)
+ self._decrypted_docs_indexes = set()
+ self._sync_id = uuid4().hex
self._docs_to_process = docs_to_process
self._deferred = defer.Deferred()
- reactor.callWhenRunning(self._launch_decrypt_and_recurse)
+ d = self._init_db()
+ d.addCallback(lambda _: self._loop.start(self.DECRYPT_LOOP_PERIOD))
+ return d
+
+ def stop(self):
+ if self._loop.running:
+ self._loop.stop()
+ self._finish()
+ SyncEncryptDecryptPool.stop(self)
+
+ def _init_db(self):
+ """
+ Ensure sync_id column is present then
+ Empty the received docs table of the sync database.
- def _launch_decrypt_and_recurse(self):
- d = self._decrypt_and_recurse()
- d.addErrback(self._errback)
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" %
+ self.TABLE_NAME)
+ d = self._runQuery(ensure_sync_id_column)
+
+ def empty_received_docs(_):
+ query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,)
+ return self._runOperation(query, (self._sync_id,))
+
+ d.addCallbacks(empty_received_docs, empty_received_docs)
+ return d
def _errback(self, failure):
log.err(failure)
@@ -491,8 +406,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
def insert_encrypted_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
"""
- Insert a received message with encrypted content, to be decrypted later
- on.
+ Decrypt and insert a received document into local staging area to be
+ processed later on.
:param doc_id: The document ID.
:type doc_id: str
@@ -507,15 +422,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:param idx: The index of this document in the current sync process.
:type idx: int
- :return: A deferred that will fire when the operation in the database
- has finished.
+ :return: A deferred that will fire after the decrypted document has
+ been inserted in the sync db.
:rtype: twisted.internet.defer.Deferred
"""
- docstr = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
- % self.TABLE_NAME
- return self._runOperation(
- query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx
+ # decrypt asynchronously
+ # TODO use dedicated threadpool / move to ampoule
+ d = threads.deferToThread(
+ decrypt_doc_task, *args)
+ # callback will insert it for later processing
+ d.addCallback(self._decrypt_doc_cb)
+ return d
def insert_received_doc(
self, doc_id, doc_rev, content, gen, trans_id, idx):
@@ -543,56 +465,29 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
if not isinstance(content, str):
content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._runOperation(
- query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+ d = self._runOperation(
+ query, (doc_id, doc_rev, content, gen, trans_id, 0,
+ idx, self._sync_id))
+ d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))
+ return d
- def _delete_received_doc(self, doc_id):
+ def _delete_received_docs(self, doc_ids):
"""
- Delete a received doc after it was inserted into the local db.
+ Delete a list of received docs after get them inserted into the db.
- :param doc_id: Document ID.
- :type doc_id: str
+ :param doc_id: Document ID list.
+ :type doc_id: list
:return: A deferred that will fire when the operation in the database
has finished.
:rtype: twisted.internet.defer.Deferred
"""
- query = "DELETE FROM '%s' WHERE doc_id=?" \
- % self.TABLE_NAME
- return self._runOperation(query, (doc_id,))
-
- def _async_decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx):
- """
- Dispatch an asynchronous document decrypting routine and save the
- result object.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- content = json.loads(content)
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret, idx
- # decrypt asynchronously
- self._async_results.append(
- self._pool.apply_async(
- decrypt_doc_task, args))
+ placeholders = ', '.join('?' for _ in doc_ids)
+ query = "DELETE FROM '%s' WHERE doc_id in (%s)" \
+ % (self.TABLE_NAME, placeholders)
+ return self._runOperation(query, (doc_ids))
def _decrypt_doc_cb(self, result):
"""
@@ -610,11 +505,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc_id, rev, content, gen, trans_id, idx = result
logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
% (doc_id, rev, gen, trans_id))
- self._decrypting_docs.remove((doc_id, rev))
return self.insert_received_doc(
doc_id, rev, content, gen, trans_id, idx)
- def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ def _get_docs(self, encrypted=None, sequence=None):
"""
Get documents from the received docs table in the sync db.
@@ -622,9 +516,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
field equal to given parameter.
:type encrypted: bool or None
:param order_by: The name of the field to order results.
- :type order_by: str
- :param order: Whether the order should be ASC or DESC.
- :type order: str
:return: A deferred that will fire with the results of the database
query.
@@ -632,10 +523,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
"idx FROM %s" % self.TABLE_NAME
- if encrypted is not None:
- query += " WHERE encrypted = %d" % int(encrypted)
- query += " ORDER BY %s %s" % (order_by, order)
- return self._runQuery(query)
+ parameters = []
+ if encrypted or sequence:
+ query += " WHERE sync_id = ? and"
+ parameters += [self._sync_id]
+ if encrypted:
+ query += " encrypted = ?"
+ parameters += [int(encrypted)]
+ if sequence:
+ query += " idx in (" + ', '.join('?' * len(sequence)) + ")"
+ parameters += [int(i) for i in sequence]
+ query += " ORDER BY idx ASC"
+ return self._runQuery(query, parameters)
@defer.inlineCallbacks
def _get_insertable_docs(self):
@@ -646,35 +545,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
documents.
:rtype: twisted.internet.defer.Deferred
"""
- # here, we fetch the list of decrypted documents and compare with the
- # index of the last succesfully processed document.
- decrypted_docs = yield self._get_docs(encrypted=False)
- insertable = []
- last_idx = self._last_inserted_idx
- for doc_id, rev, content, gen, trans_id, encrypted, idx in \
- decrypted_docs:
- if (idx != last_idx + 1):
- break
- insertable.append((doc_id, rev, content, gen, trans_id, idx))
- last_idx += 1
- defer.returnValue(insertable)
-
- @defer.inlineCallbacks
- def _async_decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
-
- :return: A deferred that will fire after all documents have been
- decrypted and inserted back in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- docs = yield self._get_docs(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _, idx in docs:
- if (doc_id, rev) not in self._decrypting_docs:
- self._decrypting_docs.append((doc_id, rev))
- self._async_decrypt_doc(
- doc_id, rev, content, gen, trans_id, idx)
+ # Here, check in memory what are the insertable indexes that can
+ # form a sequence starting from the last inserted index
+ sequence = []
+ insertable_docs = []
+ next_index = self._last_inserted_idx + 1
+ while next_index in self._decrypted_docs_indexes:
+ sequence.append(str(next_index))
+ next_index += 1
+ # Then fetch all the ones ready for insertion.
+ if sequence:
+ insertable_docs = yield self._get_docs(encrypted=False,
+ sequence=sequence)
+ defer.returnValue(insertable_docs)
@defer.inlineCallbacks
def _process_decrypted_docs(self):
@@ -687,36 +570,18 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
insertable = yield self._get_insertable_docs()
+ processed_docs_ids = []
for doc_fields in insertable:
method = self._insert_decrypted_local_doc
# FIXME: This is used only because SQLCipherU1DBSync is synchronous
# When adbapi is used there is no need for an external thread
# Without this the reactor can freeze and fail docs download
yield threads.deferToThread(method, *doc_fields)
- defer.returnValue(insertable)
-
- def _delete_processed_docs(self, inserted):
- """
- Delete from the sync db documents that have been processed.
-
- :param inserted: List of documents inserted in the previous process
- step.
- :type inserted: list
-
- :return: A list of deferreds that will fire when each operation in the
- database has finished.
- :rtype: twisted.internet.defer.DeferredList
- """
- deferreds = []
- for doc_id, doc_rev, _, _, _, _ in inserted:
- deferreds.append(
- self._delete_received_doc(doc_id))
- if not deferreds:
- return defer.succeed(None)
- return defer.gatherResults(deferreds)
+ processed_docs_ids.append(doc_fields[0])
+ yield self._delete_received_docs(processed_docs_ids)
def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id, idx):
+ gen, trans_id, encrypted, idx):
"""
Insert the decrypted document into the local replica.
@@ -751,32 +616,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._last_inserted_idx = idx
self._processed_docs += 1
- def _init_db(self):
- """
- Empty the received docs table of the sync database.
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- return self._runOperation(query)
-
- @defer.inlineCallbacks
- def _collect_async_decryption_results(self):
- """
- Collect the results of the asynchronous doc decryptions and re-raise
- any exception raised by a multiprocessing async decryption call.
-
- :raise Exception: Raised if an async call has raised an exception.
- """
- async_results = self._async_results[:]
- for res in async_results:
- if res.ready():
- # XXX: might raise an exception!
- yield self._decrypt_doc_cb(res.get())
- self._async_results.remove(res)
-
@defer.inlineCallbacks
def _decrypt_and_recurse(self):
"""
@@ -792,22 +631,19 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
delete operations have been executed.
:rtype: twisted.internet.defer.Deferred
"""
+ if not self.running:
+ defer.returnValue(None)
processed = self._processed_docs
pending = self._docs_to_process
if processed < pending:
- yield self._async_decrypt_received_docs()
- yield self._collect_async_decryption_results()
- docs = yield self._process_decrypted_docs()
- yield self._delete_processed_docs(docs)
- # recurse
- self._delayed_call = reactor.callLater(
- self.DECRYPT_LOOP_PERIOD,
- self._launch_decrypt_and_recurse)
+ yield self._process_decrypted_docs()
else:
self._finish()
def _finish(self):
self._processed_docs = 0
self._last_inserted_idx = 0
- self._deferred.callback(None)
+ self._decrypted_docs_indexes = set()
+ if not self._deferred.called:
+ self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
index 08775580..4fc91d9d 100644
--- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times.py
@@ -24,9 +24,9 @@ import hashlib
import os
import sys
-import u1db
from twisted.internet import defer, reactor
+from leap.soledad.common import l2db
from leap.soledad.client import adbapi
from leap.soledad.client.sqlcipher import SQLCipherOptions
@@ -135,7 +135,7 @@ def countDocs(_):
def printResult(r, **kwargs):
if kwargs:
debug(*kwargs.values())
- elif isinstance(r, u1db.Document):
+ elif isinstance(r, l2db.Document):
debug(r.doc_id, r.content['number'])
else:
len_results = len(r[1])
diff --git a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
index 9deba136..38ea18a3 100644
--- a/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
+++ b/client/src/leap/soledad/client/examples/benchmarks/measure_index_times_custom_docid.py
@@ -24,11 +24,11 @@ import hashlib
import os
import sys
-import u1db
from twisted.internet import defer, reactor
from leap.soledad.client import adbapi
from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.common import l2db
folder = os.environ.get("TMPDIR", "tmp")
@@ -135,7 +135,7 @@ def countDocs(_):
def printResult(r, **kwargs):
if kwargs:
debug(*kwargs.values())
- elif isinstance(r, u1db.Document):
+ elif isinstance(r, l2db.Document):
debug(r.doc_id, r.content['number'])
else:
len_results = len(r[1])
diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py
index d7bd21f2..a2683836 100644
--- a/client/src/leap/soledad/client/examples/use_adbapi.py
+++ b/client/src/leap/soledad/client/examples/use_adbapi.py
@@ -21,11 +21,11 @@ from __future__ import print_function
import datetime
import os
-import u1db
from twisted.internet import defer, reactor
from leap.soledad.client import adbapi
from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.common import l2db
folder = os.environ.get("TMPDIR", "tmp")
@@ -68,7 +68,7 @@ def countDocs(_):
def printResult(r):
- if isinstance(r, u1db.Document):
+ if isinstance(r, l2db.Document):
debug(r.doc_id, r.content['number'])
else:
len_results = len(r[1])
diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py
index a16531ef..b7e54aa4 100644
--- a/client/src/leap/soledad/client/http_target/__init__.py
+++ b/client/src/leap/soledad/client/http_target/__init__.py
@@ -22,6 +22,7 @@ after receiving.
"""
+import os
import logging
from leap.common.http import HTTPClient
@@ -33,6 +34,12 @@ from leap.soledad.client.http_target.fetch import HTTPDocFetcher
logger = logging.getLogger(__name__)
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
"""
@@ -93,3 +100,6 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher):
# the duplicated syncing bug. This could be reduced to the 30s default
# after implementing Cancellable Sync. See #7382
self._http = HTTPClient(cert_file, timeout=90)
+
+ if DO_STATS:
+ self.sync_exchange_phase = [0]
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index 94354092..f8de9a15 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -14,17 +14,25 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import time
import json
import base64
from uuid import uuid4
-from u1db import SyncTarget
from twisted.web.error import Error
from twisted.internet import defer
-from leap.soledad.common.errors import InvalidAuthTokenError
from leap.soledad.client.http_target.support import readBody
+from leap.soledad.common.errors import InvalidAuthTokenError
+from leap.soledad.common.l2db import SyncTarget
+
+
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
class SyncTargetAPI(SyncTarget):
@@ -187,6 +195,10 @@ class SyncTargetAPI(SyncTarget):
transaction id of the target replica.
:rtype: twisted.internet.defer.Deferred
"""
+ # ---------- phase 1: send docs to server ----------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
self._ensure_callback = ensure_callback
@@ -203,6 +215,11 @@ class SyncTargetAPI(SyncTarget):
last_known_trans_id,
sync_id)
+ # ---------- phase 2: receive docs -----------------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
cur_target_gen, cur_target_trans_id = yield self._receive_docs(
last_known_generation, last_known_trans_id,
ensure_callback, sync_id,
@@ -214,6 +231,11 @@ class SyncTargetAPI(SyncTarget):
cur_target_gen = gen_after_send
cur_target_trans_id = trans_id_after_send
+ # ---------- phase 3: sync exchange is over --------------------------
+ if DO_STATS:
+ self.sync_exchange_phase[0] += 1
+ # --------------------------------------------------------------------
+
defer.returnValue([cur_target_gen, cur_target_trans_id])
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 9f7a4193..a3f70b02 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -16,15 +16,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import json
-from u1db import errors
-from u1db.remote import utils
+
from twisted.internet import defer
-from leap.soledad.common.document import SoledadDocument
+
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.http_target.support import RequestBody
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
logger = logging.getLogger(__name__)
@@ -81,9 +83,6 @@ class HTTPDocFetcher(object):
new_generation = ngen
new_transaction_id = ntrans
- if defer_decryption:
- self._sync_decr_pool.start(number_of_changes)
-
# ---------------------------------------------------------------------
# maybe receive the rest of the documents
# ---------------------------------------------------------------------
@@ -151,6 +150,10 @@ class HTTPDocFetcher(object):
new_generation, new_transaction_id, number_of_changes, doc_id, \
rev, content, gen, trans_id = \
self._parse_received_doc_response(response)
+
+ if self._sync_decr_pool and not self._sync_decr_pool.running:
+ self._sync_decr_pool.start(number_of_changes)
+
if doc_id is not None:
# decrypt incoming document and insert into local database
# -------------------------------------------------------------
diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py
index 89288779..13218acf 100644
--- a/client/src/leap/soledad/client/http_target/send.py
+++ b/client/src/leap/soledad/client/http_target/send.py
@@ -16,10 +16,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import logging
+
from twisted.internet import defer
+
from leap.soledad.client.events import emit_async
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.http_target.support import RequestBody
+
logger = logging.getLogger(__name__)
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index 2625744c..6ec98ed4 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -16,20 +16,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import warnings
import json
-from u1db import errors
-from u1db.remote import http_errors
+
from twisted.internet import defer
from twisted.web.client import _ReadBodyProtocol
from twisted.web.client import PartialDownloadError
from twisted.web._newclient import ResponseDone
from twisted.web._newclient import PotentialDataLoss
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import http_errors
# we want to make sure that HTTP errors will raise appropriate u1db errors,
# that is, fire errbacks with the appropriate failures, in the context of
# twisted. Because of that, we redefine the http body reader used by the HTTP
# client below.
+
class ReadBodyProtocol(_ReadBodyProtocol):
"""
From original Twisted implementation, focused on adding our error
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index e2a5a1d7..3547a711 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -33,7 +33,6 @@ from hashlib import sha256
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import document
-from leap.soledad.common import errors
from leap.soledad.client import events
from leap.soledad.client.crypto import encrypt_sym, decrypt_sym
@@ -81,6 +80,7 @@ class BootstrapSequenceError(SecretsException):
# Secrets handler
#
+
class SoledadSecrets(object):
"""
@@ -143,6 +143,8 @@ class SoledadSecrets(object):
KDF_LENGTH_KEY = 'kdf_length'
KDF_SCRYPT = 'scrypt'
CIPHER_AES256 = 'aes256'
+ RECOVERY_DOC_VERSION_KEY = 'version'
+ RECOVERY_DOC_VERSION = 1
"""
Keys used to access storage secrets in recovery documents.
"""
@@ -162,17 +164,12 @@ class SoledadSecrets(object):
:param shared_db: The shared database that stores user secrets.
:type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase
"""
- # XXX removed since not in use
- # We will pick the first secret available.
- # param secret_id: The id of the storage secret to be used.
-
self._uuid = uuid
self._userid = userid
self._passphrase = passphrase
self._secrets_path = secrets_path
self._shared_db = shared_db
self._secrets = {}
-
self._secret_id = None
def bootstrap(self):
@@ -195,49 +192,44 @@ class SoledadSecrets(object):
storage on server sequence has failed for some reason.
"""
# STAGE 1 - verify if secrets exist locally
- if not self._has_secret(): # try to load from local storage.
-
- # STAGE 2 - there are no secrets in local storage, so try to fetch
- # encrypted secrets from server.
- logger.info(
- 'Trying to fetch cryptographic secrets from shared recovery '
- 'database...')
-
- # --- start of atomic operation in shared db ---
+ try:
+ logger.info("Trying to load secrets from local storage...")
+ version = self._load_secrets_from_local_file()
+ # eventually migrate local and remote stored documents from old
+ # format version
+ if version < self.RECOVERY_DOC_VERSION:
+ self._store_secrets()
+ self._upload_crypto_secrets()
+ logger.info("Found secrets in local storage.")
+ return
- # obtain lock on shared db
- token = timeout = None
- try:
- token, timeout = self._shared_db.lock()
- except errors.AlreadyLockedError:
- raise BootstrapSequenceError('Database is already locked.')
- except errors.LockTimedOutError:
- raise BootstrapSequenceError('Lock operation timed out.')
+ except NoStorageSecret:
+ logger.info("Could not find secrets in local storage.")
- self._get_or_gen_crypto_secrets()
+ # STAGE 2 - there are no secrets in local storage and this is the
+ # first time we are running soledad with the specified
+ # secrets_path. Try to fetch encrypted secrets from
+ # server.
+ try:
+ logger.info('Trying to fetch secrets from remote storage...')
+ version = self._download_crypto_secrets()
+ self._store_secrets()
+ # eventually migrate remote stored document from old format
+ # version
+ if version < self.RECOVERY_DOC_VERSION:
+ self._upload_crypto_secrets()
+ logger.info('Found secrets in remote storage.')
+ return
+ except NoStorageSecret:
+ logger.info("Could not find secrets in remote storage.")
- # release the lock on shared db
- try:
- self._shared_db.unlock(token)
- self._shared_db.close()
- except errors.NotLockedError:
- # for some reason the lock expired. Despite that, secret
- # loading or generation/storage must have been executed
- # successfully, so we pass.
- pass
- except errors.InvalidTokenError:
- # here, our lock has not only expired but also some other
- # client application has obtained a new lock and is currently
- # doing its thing in the shared database. Using the same
- # reasoning as above, we assume everything went smooth and
- # pass.
- pass
- except Exception as e:
- logger.error("Unhandled exception when unlocking shared "
- "database.")
- logger.exception(e)
-
- # --- end of atomic operation in shared db ---
+ # STAGE 3 - there are no secrets in server also, so we want to
+ # generate the secrets and store them in the remote
+ # db.
+ logger.info("Generating secrets...")
+ self._gen_crypto_secrets()
+ logger.info("Uploading secrets...")
+ self._upload_crypto_secrets()
def _has_secret(self):
"""
@@ -246,21 +238,7 @@ class SoledadSecrets(object):
:return: Whether there's a storage secret for symmetric encryption.
:rtype: bool
"""
- logger.info("Checking if there's a secret in local storage...")
- if (self._secret_id is None or self._secret_id not in self._secrets) \
- and os.path.isfile(self._secrets_path):
- try:
- self._load_secrets() # try to load from disk
- except IOError as e:
- logger.warning(
- 'IOError while loading secrets from disk: %s' % str(e))
-
- if self.storage_secret is not None:
- logger.info("Found a secret in local storage.")
- return True
-
- logger.info("Could not find a secret in local storage.")
- return False
+ return self.storage_secret is not None
def _maybe_set_active_secret(self, active_secret):
"""
@@ -272,73 +250,82 @@ class SoledadSecrets(object):
active_secret = self._secrets.items()[0][0]
self.set_secret_id(active_secret)
- def _load_secrets(self):
+ def _load_secrets_from_local_file(self):
"""
Load storage secrets from local file.
+
+ :return version: The version of the locally stored recovery document.
+
+ :raise NoStorageSecret: Raised if there are no secrets available in
+ local storage.
"""
+ # check if secrets file exists and we can read it
+ if not os.path.isfile(self._secrets_path):
+ raise NoStorageSecret
+
# read storage secrets from file
content = None
with open(self._secrets_path, 'r') as f:
content = json.loads(f.read())
- _, active_secret = self._import_recovery_document(content)
+ _, active_secret, version = self._import_recovery_document(content)
+
self._maybe_set_active_secret(active_secret)
- # enlarge secret if needed
- enlarged = False
- if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH:
- gen_len = self.GEN_SECRET_LENGTH \
- - len(self._secrets[self._secret_id])
- new_piece = os.urandom(gen_len)
- self._secrets[self._secret_id] += new_piece
- enlarged = True
- # store and save in shared db if needed
- if enlarged:
- self._store_secrets()
- self._put_secrets_in_shared_db()
- def _get_or_gen_crypto_secrets(self):
+ return version
+
+ def _download_crypto_secrets(self):
+ """
+ Download crypto secrets.
+
+ :return version: The version of the remotelly stored recovery document.
+
+ :raise NoStorageSecret: Raised if there are no secrets available in
+ remote storage.
+ """
+ doc = None
+ if self._shared_db.syncable:
+ doc = self._get_secrets_from_shared_db()
+
+ if doc is None:
+ raise NoStorageSecret
+
+ _, active_secret, version = self._import_recovery_document(doc.content)
+ self._maybe_set_active_secret(active_secret)
+
+ return version
+
+ def _gen_crypto_secrets(self):
+ """
+ Generate the crypto secrets.
+ """
+ logger.info('No cryptographic secrets found, creating new secrets...')
+ secret_id = self._gen_secret()
+ self.set_secret_id(secret_id)
+
+ def _upload_crypto_secrets(self):
"""
- Retrieves or generates the crypto secrets.
+ Send crypto secrets to shared db.
:raises BootstrapSequenceError: Raised when unable to store secrets in
shared database.
"""
if self._shared_db.syncable:
- doc = self._get_secrets_from_shared_db()
- else:
- doc = None
-
- if doc is not None:
- logger.info(
- 'Found cryptographic secrets in shared recovery '
- 'database.')
- _, active_secret = self._import_recovery_document(doc.content)
- self._maybe_set_active_secret(active_secret)
- self._store_secrets() # save new secrets in local file
- else:
- # STAGE 3 - there are no secrets in server also, so
- # generate a secret and store it in remote db.
- logger.info(
- 'No cryptographic secrets found, creating new '
- ' secrets...')
- self.set_secret_id(self._gen_secret())
-
- if self._shared_db.syncable:
+ try:
+ self._put_secrets_in_shared_db()
+ except Exception as ex:
+ # storing generated secret in shared db failed for
+ # some reason, so we erase the generated secret and
+ # raise.
try:
- self._put_secrets_in_shared_db()
- except Exception as ex:
- # storing generated secret in shared db failed for
- # some reason, so we erase the generated secret and
- # raise.
- try:
- os.unlink(self._secrets_path)
- except OSError as e:
- if e.errno != errno.ENOENT:
- # no such file or directory
- logger.exception(e)
- logger.exception(ex)
- raise BootstrapSequenceError(
- 'Could not store generated secret in the shared '
- 'database, bailing out...')
+ os.unlink(self._secrets_path)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ # no such file or directory
+ logger.exception(e)
+ logger.exception(ex)
+ raise BootstrapSequenceError(
+ 'Could not store generated secret in the shared '
+ 'database, bailing out...')
#
# Shared DB related methods
@@ -360,7 +347,7 @@ class SoledadSecrets(object):
"""
Export the storage secrets.
- A recovery document has the following structure:
+ Current format of recovery document has the following structure:
{
'storage_secrets': {
@@ -371,6 +358,7 @@ class SoledadSecrets(object):
},
},
'active_secret': '<secret_id>',
+ 'version': '<recovery document format version>',
}
Note that multiple storage secrets might be stored in one recovery
@@ -388,13 +376,14 @@ class SoledadSecrets(object):
data = {
self.STORAGE_SECRETS_KEY: encrypted_secrets,
self.ACTIVE_SECRET_KEY: self._secret_id,
+ self.RECOVERY_DOC_VERSION_KEY: self.RECOVERY_DOC_VERSION,
}
return data
def _import_recovery_document(self, data):
"""
- Import storage secrets for symmetric encryption and uuid (if present)
- from a recovery document.
+ Import storage secrets for symmetric encryption from a recovery
+ document.
Note that this method does not store the imported data on disk. For
that, use C{self._store_secrets()}.
@@ -402,11 +391,44 @@ class SoledadSecrets(object):
:param data: The recovery document.
:type data: dict
- :return: A tuple containing the number of imported secrets and the
- secret_id of the last active secret.
- :rtype: (int, str)
+ :return: A tuple containing the number of imported secrets, the
+ secret_id of the last active secret, and the recovery
+ document format version.
+ :rtype: (int, str, int)
"""
soledad_assert(self.STORAGE_SECRETS_KEY in data)
+ version = data.get(self.RECOVERY_DOC_VERSION_KEY, 1)
+ meth = getattr(self, '_import_recovery_document_version_%d' % version)
+ secret_count, active_secret = meth(data)
+ return secret_count, active_secret, version
+
+ def _import_recovery_document_version_1(self, data):
+ """
+ Import storage secrets for symmetric encryption from a recovery
+ document with format version 1.
+
+ Version 1 of recovery document has the following structure:
+
+ {
+ 'storage_secrets': {
+ '<storage_secret id>': {
+ 'cipher': 'aes256',
+ 'length': <secret length>,
+ 'secret': '<encrypted storage_secret>',
+ },
+ },
+ 'active_secret': '<secret_id>',
+ 'version': '<recovery document format version>',
+ }
+
+ :param data: The recovery document.
+ :type data: dict
+
+ :return: A tuple containing the number of imported secrets, the
+ secret_id of the last active secret, and the recovery
+ document format version.
+ :rtype: (int, str, int)
+ """
# include secrets in the secret pool.
secret_count = 0
secrets = data[self.STORAGE_SECRETS_KEY].items()
@@ -419,7 +441,8 @@ class SoledadSecrets(object):
if secret_id not in self._secrets:
try:
self._secrets[secret_id] = \
- self._decrypt_storage_secret(encrypted_secret)
+ self._decrypt_storage_secret_version_1(
+ encrypted_secret)
secret_count += 1
except SecretsException as e:
logger.error("Failed to decrypt storage secret: %s"
@@ -478,13 +501,21 @@ class SoledadSecrets(object):
# Management of secret for symmetric encryption.
#
- def _decrypt_storage_secret(self, encrypted_secret_dict):
+ def _decrypt_storage_secret_version_1(self, encrypted_secret_dict):
"""
Decrypt the storage secret.
Storage secret is encrypted before being stored. This method decrypts
and returns the decrypted storage secret.
+ Version 1 of storage secret format has the following structure:
+
+ '<storage_secret id>': {
+ 'cipher': 'aes256',
+ 'length': <secret length>,
+ 'secret': '<encrypted storage_secret>',
+ },
+
:param encrypted_secret_dict: The encrypted storage secret.
:type encrypted_secret_dict: dict
diff --git a/client/src/leap/soledad/client/shared_db.py b/client/src/leap/soledad/client/shared_db.py
index 6abf8ea3..d43db045 100644
--- a/client/src/leap/soledad/client/shared_db.py
+++ b/client/src/leap/soledad/client/shared_db.py
@@ -17,7 +17,7 @@
"""
A shared database for storing/retrieving encrypted key material.
"""
-from u1db.remote import http_database
+from leap.soledad.common.l2db.remote import http_database
from leap.soledad.client.auth import TokenBasedAuth
@@ -151,33 +151,3 @@ class SoledadSharedDatabase(http_database.HTTPDatabase, TokenBasedAuth):
http_database.HTTPDatabase.__init__(self, url, document_factory,
creds)
self._uuid = uuid
-
- def lock(self):
- """
- Obtain a lock on document with id C{doc_id}.
-
- :return: A tuple containing the token to unlock and the timeout until
- lock expiration.
- :rtype: (str, float)
-
- :raise HTTPError: Raised if any HTTP error occurs.
- """
- if self.syncable:
- res, headers = self._request_json(
- 'PUT', ['lock', self._uuid], body={})
- return res['token'], res['timeout']
- else:
- return None, None
-
- def unlock(self, token):
- """
- Release the lock on shared database.
-
- :param token: The token returned by a previous call to lock().
- :type token: str
-
- :raise HTTPError:
- """
- if self.syncable:
- _, _ = self._request_json(
- 'DELETE', ['lock', self._uuid], params={'token': token})
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 22ddc87d..166c0783 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -44,10 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0.
import logging
import os
import json
-import u1db
-
-from u1db import errors as u1db_errors
-from u1db.backends import sqlite_backend
from hashlib import sha256
from functools import partial
@@ -58,11 +54,15 @@ from twisted.internet import reactor
from twisted.internet import defer
from twisted.enterprise import adbapi
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import l2db
+from leap.soledad.common.l2db import errors as u1db_errors
+from leap.soledad.common.l2db.backends import sqlite_backend
+from leap.soledad.common.errors import DatabaseAccessError
+
from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from leap.soledad.client.sync import SoledadSynchronizer
-
from leap.soledad.client import pragmas
-from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
@@ -72,6 +72,12 @@ logger = logging.getLogger(__name__)
sqlite_backend.dbapi2 = sqlcipher_dbapi2
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
"""
Initialize a SQLCipher database.
@@ -278,7 +284,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
if self.defer_encryption:
# TODO move to api?
- self._sync_enc_pool.enqueue_doc_for_encryption(doc)
+ self._sync_enc_pool.encrypt_doc(doc)
return doc_rev
#
@@ -437,21 +443,21 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
# format is the following:
#
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
-
self._syncers = {}
-
- # Storage for the documents received during a sync
+ # storage for the documents received during a sync
self.received_docs = []
self.running = False
+ self.shutdownID = None
+ self._db_handle = None
+ # initialize the main db before scheduling a start
+ self._initialize_main_db()
self._reactor = reactor
self._reactor.callWhenRunning(self._start)
- self._db_handle = None
- self._initialize_main_db()
-
- self.shutdownID = None
+ if DO_STATS:
+ self.sync_phase = None
@property
def _replica_uid(self):
@@ -464,11 +470,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self.running = True
def _initialize_main_db(self):
- self._db_handle = initialize_sqlcipher_db(
- self._opts, check_same_thread=False)
- self._real_replica_uid = None
- self._ensure_schema()
- self.set_document_factory(soledad_doc_factory)
+ try:
+ self._db_handle = initialize_sqlcipher_db(
+ self._opts, check_same_thread=False)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+ except sqlcipher_dbapi2.DatabaseError as e:
+ raise DatabaseAccessError(str(e))
@defer.inlineCallbacks
def sync(self, url, creds=None, defer_decryption=True):
@@ -497,6 +506,10 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:rtype: Deferred
"""
syncer = self._get_syncer(url, creds=creds)
+ if DO_STATS:
+ self.sync_phase = syncer.sync_phase
+ self.syncer = syncer
+ self.sync_exchange_phase = syncer.sync_exchange_phase
local_gen_before_sync = yield syncer.sync(
defer_decryption=defer_decryption)
self.received_docs = syncer.received_docs
@@ -582,7 +595,7 @@ class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
self._db_handle = conn
self._real_replica_uid = None
self._ensure_schema()
- self._factory = u1db.Document
+ self._factory = l2db.Document
class SoledadSQLCipherWrapper(SQLCipherDatabase):
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 1879031f..2656a150 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,18 +17,26 @@
"""
Soledad synchronization utilities.
"""
+import os
+import time
import logging
from twisted.internet import defer
-from u1db import errors
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.sync import Synchronizer
from leap.soledad.common.errors import BackendNotReadyError
-from u1db.sync import Synchronizer
logger = logging.getLogger(__name__)
+# we may want to collect statistics from the sync process
+DO_STATS = False
+if os.environ.get('SOLEDAD_STATS'):
+ DO_STATS = True
+
+
class SoledadSynchronizer(Synchronizer):
"""
Collect the state around synchronizing 2 U1DB replicas.
@@ -42,6 +50,12 @@ class SoledadSynchronizer(Synchronizer):
"""
received_docs = []
+ def __init__(self, *args, **kwargs):
+ Synchronizer.__init__(self, *args, **kwargs)
+ if DO_STATS:
+ self.sync_phase = [0]
+ self.sync_exchange_phase = None
+
@defer.inlineCallbacks
def sync(self, defer_decryption=True):
"""
@@ -64,9 +78,16 @@ class SoledadSynchronizer(Synchronizer):
the local generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
"""
+
sync_target = self.sync_target
self.received_docs = []
+ # ---------- phase 1: get sync info from server ----------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ self.sync_exchange_phase = self.sync_target.sync_exchange_phase
+ # --------------------------------------------------------------------
+
# get target identifier, its current generation,
# and its last-seen database generation for this source
ensure_callback = None
@@ -106,6 +127,11 @@ class SoledadSynchronizer(Synchronizer):
self.source.validate_gen_and_trans_id(
target_my_gen, target_my_trans_id)
+ # ---------- phase 2: what's changed ---------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
logger.debug("Soledad sync: there are %d documents to send."
@@ -130,6 +156,11 @@ class SoledadSynchronizer(Synchronizer):
raise errors.InvalidTransactionId
defer.returnValue(my_gen)
+ # ---------- phase 3: sync exchange ----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
# prepare to send all the changed docs
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
docs_to_send = self.source.get_docs(
@@ -162,6 +193,12 @@ class SoledadSynchronizer(Synchronizer):
"my_gen": my_gen
}
self._syncing_info = info
+
+ # ---------- phase 4: complete sync ----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
yield self.complete_sync()
_, _, changes = self.source.whats_changed(target_my_gen)
@@ -170,6 +207,11 @@ class SoledadSynchronizer(Synchronizer):
just_received = list(set(changed_doc_ids) - set(ids_sent))
self.received_docs = just_received
+ # ---------- phase 5: sync is over -----------------------------------
+ if DO_STATS:
+ self.sync_phase[0] += 1
+ # --------------------------------------------------------------------
+
defer.returnValue(my_gen)
def complete_sync(self):