summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-10-02 06:17:57 -0500
committerKali Kaneko <kali@leap.se>2015-02-11 14:03:17 -0400
commit8f4daa13744c049dcc96eb2cb780df1e9ba08738 (patch)
treed089b48932ac99b2c2f83fd541522a6fe8056227 /client/src/leap/soledad
parente0f70a342deccbb53a6ea7215b3322388bb18461 (diff)
Separate soledad interfaces
* Separate local storage, syncers and shared_db * Comment out unused need_sync method * Use twisted LoopingCall * Create a threadpool for syncs * Return deferred from sync method * Do not pass crypto to SQLCipherDatabase * Pass replica_uid to u1db_syncer * Rename / reorganize some initialization methods
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/adbapi.py28
-rw-r--r--client/src/leap/soledad/client/api.py613
-rw-r--r--client/src/leap/soledad/client/examples/use_api.py2
-rw-r--r--client/src/leap/soledad/client/interfaces.py361
-rw-r--r--client/src/leap/soledad/client/pragmas.py13
-rw-r--r--client/src/leap/soledad/client/secrets.py15
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py326
-rw-r--r--client/src/leap/soledad/client/sync.py7
-rw-r--r--client/src/leap/soledad/client/target.py45
9 files changed, 755 insertions, 655 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 3b15509b..60d9e195 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -30,7 +30,7 @@ from u1db.backends import sqlite_backend
from twisted.enterprise import adbapi
from twisted.python import log
-from leap.soledad.client.sqlcipher import set_init_pragmas
+from leap.soledad.client import sqlcipher as soledad_sqlcipher
DEBUG_SQL = os.environ.get("LEAP_DEBUG_SQL")
@@ -40,18 +40,15 @@ if DEBUG_SQL:
def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
if openfun is None and driver == "pysqlcipher":
- openfun = partial(set_init_pragmas, opts=opts)
+ openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts)
return U1DBConnectionPool(
"%s.dbapi2" % driver, database=opts.path,
check_same_thread=False, cp_openfun=openfun)
-# XXX work in progress --------------------------------------------
-
-
-class U1DBSqliteWrapper(sqlite_backend.SQLitePartialExpandDatabase):
+class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
"""
- A very simple wrapper around sqlcipher backend.
+ A very simple wrapper for u1db around sqlcipher backend.
Instead of initializing the database on the fly, it just uses an existing
connection that is passed to it in the initializer.
@@ -64,9 +61,24 @@ class U1DBSqliteWrapper(sqlite_backend.SQLitePartialExpandDatabase):
self._factory = u1db.Document
+class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase):
+ """
+ A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+ """
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_sqlcipher.soledad_doc_factory)
+ self._prime_replica_uid()
+
+
class U1DBConnection(adbapi.Connection):
- u1db_wrapper = U1DBSqliteWrapper
+ u1db_wrapper = SoledadSQLCipherWrapper
def __init__(self, pool, init_u1db=False):
self.init_u1db = init_u1db
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 703b9516..493f6c1d 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -41,6 +41,9 @@ except ImportError:
from u1db.remote import http_client
from u1db.remote.ssl_match_hostname import match_hostname
+from zope.interface import implements
+
+from twisted.python import log
from leap.common.config import get_path_prefix
from leap.soledad.common import SHARED_DB_NAME
@@ -49,11 +52,11 @@ from leap.soledad.common import soledad_assert_type
from leap.soledad.client import adbapi
from leap.soledad.client import events as soledad_events
+from leap.soledad.client import interfaces as soledad_interfaces
from leap.soledad.client.crypto import SoledadCrypto
from leap.soledad.client.secrets import SoledadSecrets
from leap.soledad.client.shared_db import SoledadSharedDatabase
-from leap.soledad.client.target import SoledadSyncTarget
-from leap.soledad.client.sqlcipher import SQLCipherOptions
+from leap.soledad.client.sqlcipher import SQLCipherOptions, SQLCipherU1DBSync
logger = logging.getLogger(name=__name__)
@@ -61,17 +64,13 @@ logger = logging.getLogger(name=__name__)
# Constants
#
-SOLEDAD_CERT = None
"""
Path to the certificate file used to certify the SSL connection between
Soledad client and server.
"""
+SOLEDAD_CERT = None
-#
-# Soledad: local encrypted storage and remote encrypted sync.
-#
-
class Soledad(object):
"""
Soledad provides encrypted data storage and sync.
@@ -104,65 +103,57 @@ class Soledad(object):
SOLEDAD_DONE_DATA_SYNC: emitted inside C{sync()} method when it has
finished synchronizing with remote replica.
"""
+ implements(soledad_interfaces.ILocalStorage,
+ soledad_interfaces.ISyncableStorage,
+ soledad_interfaces.ISharedSecretsStorage)
- LOCAL_DATABASE_FILE_NAME = 'soledad.u1db'
- """
- The name of the local SQLCipher U1DB database file.
- """
-
- STORAGE_SECRETS_FILE_NAME = "soledad.json"
- """
- The name of the file where the storage secrets will be stored.
- """
-
- DEFAULT_PREFIX = os.path.join(get_path_prefix(), 'leap', 'soledad')
- """
- Prefix for default values for path.
- """
+ local_db_file_name = 'soledad.u1db'
+ secrets_file_name = "soledad.json"
+ default_prefix = os.path.join(get_path_prefix(), 'leap', 'soledad')
def __init__(self, uuid, passphrase, secrets_path, local_db_path,
server_url, cert_file,
- auth_token=None, secret_id=None, defer_encryption=False):
+ auth_token=None, defer_encryption=False):
"""
Initialize configuration, cryptographic keys and dbs.
:param uuid: User's uuid.
:type uuid: str
- :param passphrase: The passphrase for locking and unlocking encryption
- secrets for local and remote storage.
+ :param passphrase:
+ The passphrase for locking and unlocking encryption secrets for
+ local and remote storage.
:type passphrase: unicode
- :param secrets_path: Path for storing encrypted key used for
- symmetric encryption.
+ :param secrets_path:
+ Path for storing encrypted key used for symmetric encryption.
:type secrets_path: str
:param local_db_path: Path for local encrypted storage db.
:type local_db_path: str
- :param server_url: URL for Soledad server. This is used either to sync
- with the user's remote db and to interact with the
- shared recovery database.
+ :param server_url:
+ URL for Soledad server. This is used either to sync with the user's
+ remote db and to interact with the shared recovery database.
:type server_url: str
- :param cert_file: Path to the certificate of the ca used
- to validate the SSL certificate used by the remote
- soledad server.
+ :param cert_file:
+ Path to the certificate of the ca used to validate the SSL
+ certificate used by the remote soledad server.
:type cert_file: str
- :param auth_token: Authorization token for accessing remote databases.
+ :param auth_token:
+ Authorization token for accessing remote databases.
:type auth_token: str
- :param secret_id: The id of the storage secret to be used.
- :type secret_id: str
-
- :param defer_encryption: Whether to defer encryption/decryption of
- documents, or do it inline while syncing.
+ :param defer_encryption:
+ Whether to defer encryption/decryption of documents, or do it
+ inline while syncing.
:type defer_encryption: bool
- :raise BootstrapSequenceError: Raised when the secret generation and
- storage on server sequence has failed
- for some reason.
+ :raise BootstrapSequenceError:
+ Raised when the secret generation and storage on server sequence
+ has failed for some reason.
"""
# store config params
self._uuid = uuid
@@ -170,30 +161,34 @@ class Soledad(object):
self._secrets_path = secrets_path
self._local_db_path = local_db_path
self._server_url = server_url
+ self._defer_encryption = defer_encryption
+
+ self.shared_db = None
+
# configure SSL certificate
global SOLEDAD_CERT
SOLEDAD_CERT = cert_file
- self._set_token(auth_token)
- self._defer_encryption = defer_encryption
-
- self._init_config()
- self._init_dirs()
# init crypto variables
- self._shared_db_instance = None
+ self._set_token(auth_token)
self._crypto = SoledadCrypto(self)
- self._secrets = SoledadSecrets(
- self._uuid,
- self._passphrase,
- self._secrets_path,
- self._shared_db,
- self._crypto,
- secret_id=secret_id)
- # initiate bootstrap sequence
- self._bootstrap() # might raise BootstrapSequenceError()
+ self._init_config_with_defaults()
+ self._init_working_dirs()
+
+ # Initialize shared recovery database
+ self.init_shared_db(server_url, uuid, self._creds)
- def _init_config(self):
+ # The following can raise BootstrapSequenceError, that will be
+ # propagated upwards.
+ self._init_secrets()
+ self._init_u1db_sqlcipher_backend()
+ self._init_u1db_syncer()
+
+ #
+ # initialization/destruction methods
+ #
+ def _init_config_with_defaults(self):
"""
Initialize configuration using default values for missing params.
"""
@@ -202,55 +197,37 @@ class Soledad(object):
# initialize secrets_path
initialize(self._secrets_path, os.path.join(
- self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME))
-
+ self.default_prefix, self.secrets_file_name))
# initialize local_db_path
initialize(self._local_db_path, os.path.join(
- self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME))
-
+ self.default_prefix, self.local_db_file_name))
# initialize server_url
soledad_assert(self._server_url is not None,
'Missing URL for Soledad server.')
- #
- # initialization/destruction methods
- #
-
- def _bootstrap(self):
- """
- Bootstrap local Soledad instance.
-
- :raise BootstrapSequenceError:
- Raised when the secret generation and storage on server sequence
- has failed for some reason.
- """
- self._secrets.bootstrap()
- self._init_db()
- # XXX initialize syncers?
-
- def _init_dirs(self):
+ def _init_working_dirs(self):
"""
Create work directories.
:raise OSError: in case file exists and is not a dir.
"""
- paths = map(
- lambda x: os.path.dirname(x),
- [self._local_db_path, self._secrets_path])
+ paths = map(lambda x: os.path.dirname(x), [
+ self._local_db_path, self._secrets_path])
for path in paths:
- try:
- if not os.path.isdir(path):
- logger.info('Creating directory: %s.' % path)
- os.makedirs(path)
- except OSError as exc:
- if exc.errno == errno.EEXIST and os.path.isdir(path):
- pass
- else:
- raise
-
- def _init_db(self):
+ create_path_if_not_exists(path)
+
+ def _init_secrets(self):
+ self._secrets = SoledadSecrets(
+ self.uuid, self.passphrase, self.secrets_path,
+ self._shared_db, self._crypto)
+ self._secrets.bootstrap()
+
+ def _init_u1db_sqlcipher_backend(self):
"""
- Initialize the U1DB SQLCipher database for local storage.
+ Initialize the U1DB SQLCipher database for local storage, by
+ instantiating a modified twisted adbapi that will maintain a threadpool
+ with a u1db-sqclipher connection for each thread, and will return
+ deferreds for each u1db query.
Currently, Soledad uses the default SQLCipher cipher, i.e.
'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key,
@@ -268,8 +245,17 @@ class Soledad(object):
defer_encryption=self._defer_encryption,
sync_db_key=sync_db_key,
)
+ self._soledad_opts = opts
self._dbpool = adbapi.getConnectionPool(opts)
+ def _init_u1db_syncer(self):
+ self._dbsyncer = SQLCipherU1DBSync(
+ self._soledad_opts, self._crypto, self._defer_encryption)
+
+ #
+ # Closing methods
+ #
+
def close(self):
"""
Close underlying U1DB database.
@@ -279,401 +265,164 @@ class Soledad(object):
# TODO close syncers >>>>>>
- #if hasattr(self, '_db') and isinstance(
- #self._db,
- #SQLCipherDatabase):
- #self._db.close()
-#
- # XXX stop syncers
- # self._db.stop_sync()
-
- @property
- def _shared_db(self):
- """
- Return an instance of the shared recovery database object.
-
- :return: The shared database.
- :rtype: SoledadSharedDatabase
- """
- if self._shared_db_instance is None:
- self._shared_db_instance = SoledadSharedDatabase.open_database(
- urlparse.urljoin(self.server_url, SHARED_DB_NAME),
- self._uuid,
- False, # db should exist at this point.
- creds=self._creds)
- return self._shared_db_instance
-
#
- # Document storage, retrieval and sync.
+ # ILocalStorage
#
def put_doc(self, doc):
- # TODO what happens with this warning during the deferred life cycle?
- # Isn't it better to defend ourselves from the mutability, to avoid
- # nasty surprises?
"""
- Update a document in the local encrypted database.
-
============================== WARNING ==============================
This method converts the document's contents to unicode in-place. This
means that after calling `put_doc(doc)`, the contents of the
document, i.e. `doc.content`, might be different from before the
call.
============================== WARNING ==============================
-
- :param doc: the document to update
- :type doc: SoledadDocument
-
- :return:
- a deferred that will fire with the new revision identifier for
- the document
- :rtype: Deferred
"""
+ # TODO what happens with this warning during the deferred life cycle?
+ # Isn't it better to defend ourselves from the mutability, to avoid
+ # nasty surprises?
doc.content = self._convert_to_unicode(doc.content)
return self._dbpool.put_doc(doc)
def delete_doc(self, doc):
- """
- Delete a document from the local encrypted database.
-
- :param doc: the document to delete
- :type doc: SoledadDocument
-
- :return:
- a deferred that will fire with ...
- :rtype: Deferred
- """
# XXX what does this do when fired???
return self._dbpool.delete_doc(doc)
def get_doc(self, doc_id, include_deleted=False):
- """
- Retrieve a document from the local encrypted database.
-
- :param doc_id: the unique document identifier
- :type doc_id: str
- :param include_deleted:
- if True, deleted documents will be returned with empty content;
- otherwise asking for a deleted document will return None
- :type include_deleted: bool
-
- :return:
- A deferred that will fire with the document object, containing a
- SoledadDocument, or None if it could not be found
- :rtype: Deferred
- """
return self._dbpool.get_doc(doc_id, include_deleted=include_deleted)
def get_docs(self, doc_ids, check_for_conflicts=True,
include_deleted=False):
- """
- Get the content for many documents.
-
- :param doc_ids: a list of document identifiers
- :type doc_ids: list
- :param check_for_conflicts: if set False, then the conflict check will
- be skipped, and 'None' will be returned instead of True/False
- :type check_for_conflicts: bool
-
- :return:
- A deferred that will fire with an iterable giving the Document
- object for each document id in matching doc_ids order.
- :rtype: Deferred
- """
- return self._dbpool.get_docs(
- doc_ids, check_for_conflicts=check_for_conflicts,
- include_deleted=include_deleted)
+ return self._dbpool.get_docs(doc_ids,
+ check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
def get_all_docs(self, include_deleted=False):
- """
- Get the JSON content for all documents in the database.
-
- :param include_deleted: If set to True, deleted documents will be
- returned with empty content. Otherwise deleted
- documents will not be included in the results.
- :return:
- A deferred that will fire with (generation, [Document]): that is,
- the current generation of the database, followed by a list of all
- the documents in the database.
- :rtype: Deferred
- """
return self._dbpool.get_all_docs(include_deleted)
def create_doc(self, content, doc_id=None):
- """
- Create a new document in the local encrypted database.
-
- :param content: the contents of the new document
- :type content: dict
- :param doc_id: an optional identifier specifying the document id
- :type doc_id: str
-
- :return:
- A deferred tht will fire with the new document (SoledadDocument
- instance).
- :rtype: Deferred
- """
return self._dbpool.create_doc(
_convert_to_unicode(content), doc_id=doc_id)
def create_doc_from_json(self, json, doc_id=None):
- """
- Create a new document.
-
- You can optionally specify the document identifier, but the document
- must not already exist. See 'put_doc' if you want to override an
- existing document.
- If the database specifies a maximum document size and the document
- exceeds it, create will fail and raise a DocumentTooBig exception.
-
- :param json: The JSON document string
- :type json: str
- :param doc_id: An optional identifier specifying the document id.
- :type doc_id:
- :return:
- A deferred that will fire with the new document (A SoledadDocument
- instance)
- :rtype: Deferred
- """
return self._dbpool.create_doc_from_json(json, doc_id=doc_id)
def create_index(self, index_name, *index_expressions):
- """
- Create an named index, which can then be queried for future lookups.
- Creating an index which already exists is not an error, and is cheap.
- Creating an index which does not match the index_expressions of the
- existing index is an error.
- Creating an index will block until the expressions have been evaluated
- and the index generated.
-
- :param index_name: A unique name which can be used as a key prefix
- :type index_name: str
- :param index_expressions:
- index expressions defining the index information.
- :type index_expressions: dict
-
- Examples:
-
- "fieldname", or "fieldname.subfieldname" to index alphabetically
- sorted on the contents of a field.
-
- "number(fieldname, width)", "lower(fieldname)"
- """
return self._dbpool.create_index(index_name, *index_expressions)
def delete_index(self, index_name):
- """
- Remove a named index.
-
- :param index_name: The name of the index we are removing
- :type index_name: str
- """
return self._dbpool.delete_index(index_name)
def list_indexes(self):
- """
- List the definitions of all known indexes.
-
- :return: A list of [('index-name', ['field', 'field2'])] definitions.
- :rtype: list
- """
return self._dbpool.list_indexes()
def get_from_index(self, index_name, *key_values):
- """
- Return documents that match the keys supplied.
-
- You must supply exactly the same number of values as have been defined
- in the index. It is possible to do a prefix match by using '*' to
- indicate a wildcard match. You can only supply '*' to trailing entries,
- (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
- It is also possible to append a '*' to the last supplied value (eg
- 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
-
- :param index_name: The index to query
- :type index_name: str
- :param key_values: values to match. eg, if you have
- an index with 3 fields then you would have:
- get_from_index(index_name, val1, val2, val3)
- :type key_values: tuple
- :return: List of [Document]
- :rtype: list
- """
return self._dbpool.get_from_index(index_name, *key_values)
def get_count_from_index(self, index_name, *key_values):
- """
- Return the count of the documents that match the keys and
- values supplied.
-
- :param index_name: The index to query
- :type index_name: str
- :param key_values: values to match. eg, if you have
- an index with 3 fields then you would have:
- get_from_index(index_name, val1, val2, val3)
- :type key_values: tuple
- :return: count.
- :rtype: int
- """
return self._dbpool.get_count_from_index(index_name, *key_values)
def get_range_from_index(self, index_name, start_value, end_value):
- """
- Return documents that fall within the specified range.
-
- Both ends of the range are inclusive. For both start_value and
- end_value, one must supply exactly the same number of values as have
- been defined in the index, or pass None. In case of a single column
- index, a string is accepted as an alternative for a tuple with a single
- value. It is possible to do a prefix match by using '*' to indicate
- a wildcard match. You can only supply '*' to trailing entries, (eg
- 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
- possible to append a '*' to the last supplied value (eg 'val*', '*',
- '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
-
- :param index_name: The index to query
- :type index_name: str
- :param start_values: tuples of values that define the lower bound of
- the range. eg, if you have an index with 3 fields then you would
- have: (val1, val2, val3)
- :type start_values: tuple
- :param end_values: tuples of values that define the upper bound of the
- range. eg, if you have an index with 3 fields then you would have:
- (val1, val2, val3)
- :type end_values: tuple
- :return: A deferred that will fire with a list of [Document]
- :rtype: Deferred
- """
return self._dbpool.get_range_from_index(
index_name, start_value, end_value)
def get_index_keys(self, index_name):
- """
- Return all keys under which documents are indexed in this index.
-
- :param index_name: The index to query
- :type index_name: str
- :return:
- A deferred that will fire with a list of tuples of indexed keys.
- :rtype: Deferred
- """
return self._dbpool.get_index_keys(index_name)
def get_doc_conflicts(self, doc_id):
- """
- Get the list of conflicts for the given document.
-
- :param doc_id: the document id
- :type doc_id: str
-
- :return:
- A deferred that will fire with a list of the document entries that
- are conflicted.
- :rtype: Deferred
- """
return self._dbpool.get_doc_conflicts(doc_id)
def resolve_doc(self, doc, conflicted_doc_revs):
- """
- Mark a document as no longer conflicted.
-
- :param doc: a document with the new content to be inserted.
- :type doc: SoledadDocument
- :param conflicted_doc_revs:
- A deferred that will fire with a list of revisions that the new
- content supersedes.
- :type conflicted_doc_revs: list
- """
return self._dbpool.resolve_doc(doc, conflicted_doc_revs)
+ def _get_local_db_path(self):
+ return self._local_db_path
+
+ # XXX Do we really need all this private / property dance?
+
+ local_db_path = property(
+ _get_local_db_path,
+ doc='The path for the local database replica.')
+
+ def _get_uuid(self):
+ return self._uuid
+
+ uuid = property(_get_uuid, doc='The user uuid.')
+
#
- # Sync API
+ # ISyncableStorage
#
- # TODO have interfaces, and let it implement it.
-
def sync(self, defer_decryption=True):
- """
- Synchronize the local encrypted replica with a remote replica.
- This method blocks until a syncing lock is acquired, so there are no
- attempts of concurrent syncs from the same client replica.
+ # -----------------------------------------------------------------
+ # TODO this needs work.
+ # Should review/write tests to check that this:
- :param url: the url of the target replica to sync with
- :type url: str
+ # (1) Defer to the syncer pool -- DONE (on dbsyncer)
+ # (2) Return the deferred
+ # (3) Add the callback for signaling the event (executed on reactor
+ # thread)
+ # (4) Check that the deferred is called with the local gen.
- :param defer_decryption:
- Whether to defer the decryption process using the intermediate
- database. If False, decryption will be done inline.
- :type defer_decryption: bool
+ # TODO document that this returns a deferred
+ # -----------------------------------------------------------------
- :return:
- A deferred that will fire with the local generation before the
- synchronisation was performed.
- :rtype: str
- """
- # TODO this needs work.
- # Should:
- # (1) Defer to the syncer pool
- # (2) Return a deferred (the deferToThreadpool can be good)
- # (3) Add the callback for signaling the event
- # (4) Let the local gen be returned from the thread
+ def on_sync_done(local_gen):
+ soledad_events.signal(
+ soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid)
+ return local_gen
+
+ sync_url = urlparse.urljoin(self.server_url, 'user-%s' % self.uuid)
try:
- local_gen = self._dbsyncer.sync(
- urlparse.urljoin(self.server_url, 'user-%s' % self._uuid),
+ d = self._dbsyncer.sync(
+ sync_url,
creds=self._creds, autocreate=False,
defer_decryption=defer_decryption)
- soledad_events.signal(
- soledad_events.SOLEDAD_DONE_DATA_SYNC, self._uuid)
- return local_gen
+
+ d.addCallbacks(on_sync_done, lambda err: log.err(err))
+ return d
+
+ # TODO catch the exception by adding an Errback
except Exception as e:
logger.error("Soledad exception when syncing: %s" % str(e))
def stop_sync(self):
- """
- Stop the current syncing process.
- """
self._dbsyncer.stop_sync()
- def need_sync(self, url):
- """
- Return if local db replica differs from remote url's replica.
-
- :param url: The remote replica to compare with local replica.
- :type url: str
-
- :return: Whether remote replica and local replica differ.
- :rtype: bool
- """
- # XXX pass the get_replica_uid ------------------------
- # From where? initialize with that?
- replica_uid = self._db._get_replica_uid()
- target = SoledadSyncTarget(
- url, replica_uid, creds=self._creds, crypto=self._crypto)
-
- generation = self._db._get_generation()
+ # FIXME -------------------------------------------------------
+ # review if we really need this. I think that we can the sync
+ # fail silently if nothing is to be synced.
+ #def need_sync(self, url):
+ # XXX dispatch this method in the dbpool .................
+ #replica_uid = self._dbpool.replica_uid
+ #target = SoledadSyncTarget(
+ #url, replica_uid, creds=self._creds, crypto=self._crypto)
+#
+ # XXX does it matter if we get this from the general dbpool or the
+ # syncer pool?
+ #generation = self._dbpool.get_generation()
+#
# XXX better unpack it?
- info = target.get_sync_info(replica_uid)
-
+ #info = target.get_sync_info(replica_uid)
+#
# compare source generation with target's last known source generation
- if generation != info[4]:
- soledad_events.signal(
- soledad_events.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid)
- return True
- return False
+ #if generation != info[4]:
+ #soledad_events.signal(
+ #soledad_events.SOLEDAD_NEW_DATA_TO_SYNC, self.uuid)
+ #return True
+ #return False
@property
def syncing(self):
- """
- Property, True if the syncer is syncing.
- """
return self._dbsyncer.syncing
def _set_token(self, token):
"""
Set the authentication token for remote database access.
- Build the credentials dictionary with the following format:
+ Internally, this builds the credentials dictionary with the following
+ format:
self._{
'token': {
@@ -686,7 +435,7 @@ class Soledad(object):
"""
self._creds = {
'token': {
- 'uuid': self._uuid,
+ 'uuid': self.uuid,
'token': token,
}
}
@@ -699,25 +448,24 @@ class Soledad(object):
token = property(_get_token, _set_token, doc='The authentication Token.')
- #
- # Setters/getters
- #
-
- def _get_uuid(self):
- return self._uuid
-
- uuid = property(_get_uuid, doc='The user uuid.')
+ def _get_server_url(self):
+ return self._server_url
- def get_secret_id(self):
- return self._secrets.secret_id
+ server_url = property(
+ _get_server_url,
+ doc='The URL of the Soledad server.')
- def set_secret_id(self, secret_id):
- self._secrets.set_secret_id(secret_id)
+ #
+ # ISharedSecretsStorage
+ #
- secret_id = property(
- get_secret_id,
- set_secret_id,
- doc='The active secret id.')
+ def init_shared_db(self, server_url, uuid, creds):
+ shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME)
+ self.shared_db = SoledadSharedDatabase.open_database(
+ shared_db_url,
+ uuid,
+ creds=creds,
+ create=False) # db should exist at this point.
def _set_secrets_path(self, secrets_path):
self._secrets.secrets_path = secrets_path
@@ -730,20 +478,6 @@ class Soledad(object):
_set_secrets_path,
doc='The path for the file containing the encrypted symmetric secret.')
- def _get_local_db_path(self):
- return self._local_db_path
-
- local_db_path = property(
- _get_local_db_path,
- doc='The path for the local database replica.')
-
- def _get_server_url(self):
- return self._server_url
-
- server_url = property(
- _get_server_url,
- doc='The URL of the Soledad server.')
-
@property
def storage_secret(self):
"""
@@ -762,19 +496,7 @@ class Soledad(object):
def secrets(self):
return self._secrets
- @property
- def passphrase(self):
- return self._secrets.passphrase
-
def change_passphrase(self, new_passphrase):
- """
- Change the passphrase that encrypts the storage secret.
-
- :param new_passphrase: The new passphrase.
- :type new_passphrase: unicode
-
- :raise NoStorageSecret: Raised if there's no storage secret available.
- """
self._secrets.change_passphrase(new_passphrase)
@@ -811,6 +533,17 @@ def _convert_to_unicode(content):
return content
+def create_path_if_not_exists(path):
+ try:
+ if not os.path.isdir(path):
+ logger.info('Creating directory: %s.' % path)
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST and os.path.isdir(path):
+ pass
+ else:
+ raise
+
# ----------------------------------------------------------------------------
# Monkey patching u1db to be able to provide a custom SSL cert
# ----------------------------------------------------------------------------
diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py
index fd0a100c..4268fe71 100644
--- a/client/src/leap/soledad/client/examples/use_api.py
+++ b/client/src/leap/soledad/client/examples/use_api.py
@@ -46,7 +46,7 @@ if os.path.isfile(tmpdb):
start_time = datetime.datetime.now()
opts = SQLCipherOptions(tmpdb, "secret", create=True)
-db = sqlcipher.SQLCipherDatabase(None, opts)
+db = sqlcipher.SQLCipherDatabase(opts)
def allDone():
diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py
new file mode 100644
index 00000000..6bd3f200
--- /dev/null
+++ b/client/src/leap/soledad/client/interfaces.py
@@ -0,0 +1,361 @@
+# -*- coding: utf-8 -*-
+# interfaces.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Interfaces used by the Soledad Client.
+"""
+from zope.interface import Interface, Attribute
+
+
+class ILocalStorage(Interface):
+ """
+ I implement core methods for the u1db local storage.
+ """
+ local_db_path = Attribute(
+ "The path for the local database replica")
+ local_db_file_name = Attribute(
+ "The name of the local SQLCipher U1DB database file")
+ uuid = Attribute("The user uuid")
+ default_prefix = Attribute(
+ "Prefix for default values for path")
+
+ def put_doc(self, doc):
+ """
+ Update a document in the local encrypted database.
+
+ :param doc: the document to update
+ :type doc: SoledadDocument
+
+ :return:
+ a deferred that will fire with the new revision identifier for
+ the document
+ :rtype: Deferred
+ """
+
+ def delete_doc(self, doc):
+ """
+ Delete a document from the local encrypted database.
+
+ :param doc: the document to delete
+ :type doc: SoledadDocument
+
+ :return:
+ a deferred that will fire with ...
+ :rtype: Deferred
+ """
+
+ def get_doc(self, doc_id, include_deleted=False):
+ """
+ Retrieve a document from the local encrypted database.
+
+ :param doc_id: the unique document identifier
+ :type doc_id: str
+ :param include_deleted:
+ if True, deleted documents will be returned with empty content;
+ otherwise asking for a deleted document will return None
+ :type include_deleted: bool
+
+ :return:
+ A deferred that will fire with the document object, containing a
+ SoledadDocument, or None if it could not be found
+ :rtype: Deferred
+ """
+
+ def get_docs(self, doc_ids, check_for_conflicts=True,
+ include_deleted=False):
+ """
+ Get the content for many documents.
+
+ :param doc_ids: a list of document identifiers
+ :type doc_ids: list
+ :param check_for_conflicts: if set False, then the conflict check will
+ be skipped, and 'None' will be returned instead of True/False
+ :type check_for_conflicts: bool
+
+ :return:
+ A deferred that will fire with an iterable giving the Document
+ object for each document id in matching doc_ids order.
+ :rtype: Deferred
+ """
+
+ def get_all_docs(self, include_deleted=False):
+ """
+ Get the JSON content for all documents in the database.
+
+ :param include_deleted: If set to True, deleted documents will be
+ returned with empty content. Otherwise deleted
+ documents will not be included in the results.
+ :return:
+ A deferred that will fire with (generation, [Document]): that is,
+ the current generation of the database, followed by a list of all
+ the documents in the database.
+ :rtype: Deferred
+ """
+
+ def create_doc(self, content, doc_id=None):
+ """
+ Create a new document in the local encrypted database.
+
+ :param content: the contents of the new document
+ :type content: dict
+ :param doc_id: an optional identifier specifying the document id
+ :type doc_id: str
+
+ :return:
+ A deferred tht will fire with the new document (SoledadDocument
+ instance).
+ :rtype: Deferred
+ """
+
+ def create_doc_from_json(self, json, doc_id=None):
+ """
+ Create a new document.
+
+ You can optionally specify the document identifier, but the document
+ must not already exist. See 'put_doc' if you want to override an
+ existing document.
+ If the database specifies a maximum document size and the document
+ exceeds it, create will fail and raise a DocumentTooBig exception.
+
+ :param json: The JSON document string
+ :type json: str
+ :param doc_id: An optional identifier specifying the document id.
+ :type doc_id:
+ :return:
+ A deferred that will fire with the new document (A SoledadDocument
+ instance)
+ :rtype: Deferred
+ """
+
+ def create_index(self, index_name, *index_expressions):
+ """
+ Create an named index, which can then be queried for future lookups.
+ Creating an index which already exists is not an error, and is cheap.
+ Creating an index which does not match the index_expressions of the
+ existing index is an error.
+ Creating an index will block until the expressions have been evaluated
+ and the index generated.
+
+ :param index_name: A unique name which can be used as a key prefix
+ :type index_name: str
+ :param index_expressions:
+ index expressions defining the index information.
+ :type index_expressions: dict
+
+ Examples:
+
+ "fieldname", or "fieldname.subfieldname" to index alphabetically
+ sorted on the contents of a field.
+
+ "number(fieldname, width)", "lower(fieldname)"
+ """
+
+ def delete_index(self, index_name):
+ """
+ Remove a named index.
+
+ :param index_name: The name of the index we are removing
+ :type index_name: str
+ """
+
+ def list_indexes(self):
+ """
+ List the definitions of all known indexes.
+
+ :return: A list of [('index-name', ['field', 'field2'])] definitions.
+ :rtype: Deferred
+ """
+
+ def get_from_index(self, index_name, *key_values):
+ """
+ Return documents that match the keys supplied.
+
+ You must supply exactly the same number of values as have been defined
+ in the index. It is possible to do a prefix match by using '*' to
+ indicate a wildcard match. You can only supply '*' to trailing entries,
+ (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.)
+ It is also possible to append a '*' to the last supplied value (eg
+ 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: List of [Document]
+ :rtype: list
+ """
+
+ def get_count_from_index(self, index_name, *key_values):
+ """
+ Return the count of the documents that match the keys and
+ values supplied.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param key_values: values to match. eg, if you have
+ an index with 3 fields then you would have:
+ get_from_index(index_name, val1, val2, val3)
+ :type key_values: tuple
+ :return: count.
+ :rtype: int
+ """
+
+ def get_range_from_index(self, index_name, start_value, end_value):
+ """
+ Return documents that fall within the specified range.
+
+ Both ends of the range are inclusive. For both start_value and
+ end_value, one must supply exactly the same number of values as have
+ been defined in the index, or pass None. In case of a single column
+ index, a string is accepted as an alternative for a tuple with a single
+ value. It is possible to do a prefix match by using '*' to indicate
+ a wildcard match. You can only supply '*' to trailing entries, (eg
+ 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also
+ possible to append a '*' to the last supplied value (eg 'val*', '*',
+ '*' or 'val', 'val*', '*', but not 'val*', 'val', '*')
+
+ :param index_name: The index to query
+ :type index_name: str
+ :param start_values: tuples of values that define the lower bound of
+ the range. eg, if you have an index with 3 fields then you would
+ have: (val1, val2, val3)
+ :type start_values: tuple
+ :param end_values: tuples of values that define the upper bound of the
+ range. eg, if you have an index with 3 fields then you would have:
+ (val1, val2, val3)
+ :type end_values: tuple
+ :return: A deferred that will fire with a list of [Document]
+ :rtype: Deferred
+ """
+
+ def get_index_keys(self, index_name):
+ """
+ Return all keys under which documents are indexed in this index.
+
+ :param index_name: The index to query
+ :type index_name: str
+ :return:
+ A deferred that will fire with a list of tuples of indexed keys.
+ :rtype: Deferred
+ """
+
+ def get_doc_conflicts(self, doc_id):
+ """
+ Get the list of conflicts for the given document.
+
+ :param doc_id: the document id
+ :type doc_id: str
+
+ :return:
+ A deferred that will fire with a list of the document entries that
+ are conflicted.
+ :rtype: Deferred
+ """
+
+ def resolve_doc(self, doc, conflicted_doc_revs):
+ """
+ Mark a document as no longer conflicted.
+
+ :param doc: a document with the new content to be inserted.
+ :type doc: SoledadDocument
+ :param conflicted_doc_revs:
+ A deferred that will fire with a list of revisions that the new
+ content supersedes.
+ :type conflicted_doc_revs: list
+ """
+
+
+class ISyncableStorage(Interface):
+ """
+ I implement methods to synchronize with a remote replica.
+ """
+ replica_uid = Attribute("The uid of the local replica")
+ server_url = Attribute("The URL of the Soledad server.")
+ syncing = Attribute(
+ "Property, True if the syncer is syncing.")
+ token = Attribute("The authentication Token.")
+
+ def sync(self, defer_decryption=True):
+ """
+ Synchronize the local encrypted replica with a remote replica.
+
+ This method blocks until a syncing lock is acquired, so there are no
+ attempts of concurrent syncs from the same client replica.
+
+ :param url: the url of the target replica to sync with
+ :type url: str
+
+ :param defer_decryption:
+ Whether to defer the decryption process using the intermediate
+ database. If False, decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return:
+ A deferred that will fire with the local generation before the
+ synchronisation was performed.
+ :rtype: str
+ """
+
+ def stop_sync(self):
+ """
+ Stop the current syncing process.
+ """
+
+
+class ISharedSecretsStorage(Interface):
+ """
+ I implement methods needed for the Shared Recovery Database.
+ """
+ secrets_path = Attribute(
+ "Path for storing encrypted key used for symmetric encryption.")
+ secrets_file_name = Attribute(
+ "The name of the file where the storage secrets will be stored")
+
+ storage_secret = Attribute("")
+ remote_storage_secret = Attribute("")
+ shared_db = Attribute("The shared db object")
+
+ # XXX this used internally from secrets, so it might be good to preserve
+ # as a public boundary with other components.
+ secrets = Attribute("")
+
+ def init_shared_db(self, server_url, uuid, creds):
+ """
+ Initialize the shared recovery database.
+
+ :param server_url:
+ :type server_url:
+ :param uuid:
+ :type uuid:
+ :param creds:
+ :type creds:
+ """
+
+ def change_passphrase(self, new_passphrase):
+ """
+ Change the passphrase that encrypts the storage secret.
+
+ :param new_passphrase: The new passphrase.
+ :type new_passphrase: unicode
+
+ :raise NoStorageSecret: Raised if there's no storage secret available.
+ """
+
+ # XXX not in use. Uncomment if we ever decide to allow
+ # multiple secrets.
+ # secret_id = Attribute("The id of the storage secret to be used")
diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py
index 7a13a694..2e9c53a3 100644
--- a/client/src/leap/soledad/client/pragmas.py
+++ b/client/src/leap/soledad/client/pragmas.py
@@ -43,7 +43,7 @@ def set_crypto_pragmas(db_handle, sqlcipher_opts):
def _set_key(db_handle, key, is_raw_key):
"""
- Set the C{key} for use with the database.
+ Set the ``key`` for use with the database.
The process of creating a new, encrypted database is called 'keying'
the database. SQLCipher uses just-in-time key derivation at the point
@@ -60,9 +60,9 @@ def _set_key(db_handle, key, is_raw_key):
:param key: The key for use with the database.
:type key: str
- :param is_raw_key: Whether C{key} is a raw 64-char hex string or a
- passphrase that should be hashed to obtain the
- encyrption key.
+ :param is_raw_key:
+ Whether C{key} is a raw 64-char hex string or a passphrase that should
+ be hashed to obtain the encyrption key.
:type is_raw_key: bool
"""
if is_raw_key:
@@ -321,14 +321,11 @@ def set_write_ahead_logging(db_handle):
"""
logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING")
db_handle.cursor().execute('PRAGMA journal_mode=WAL')
+
# The optimum value can still use a little bit of tuning, but we favor
# small sizes of the WAL file to get fast reads, since we assume that
# the writes will be quick enough to not block too much.
- # TODO
- # As a further improvement, we might want to set autocheckpoint to 0
- # here and do the checkpoints manually in a separate thread, to avoid
- # any blocks in the main thread (we should run a loopingcall from here)
db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50')
diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py
index 970ac82f..93f8c25d 100644
--- a/client/src/leap/soledad/client/secrets.py
+++ b/client/src/leap/soledad/client/secrets.py
@@ -144,8 +144,7 @@ class SoledadSecrets(object):
Keys used to access storage secrets in recovery documents.
"""
- def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto,
- secret_id=None):
+ def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto):
"""
Initialize the secrets manager.
@@ -161,17 +160,20 @@ class SoledadSecrets(object):
:type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase
:param crypto: A soledad crypto object.
:type crypto: SoledadCrypto
- :param secret_id: The id of the storage secret to be used.
- :type secret_id: str
"""
+ # XXX removed since not in use
+ # We will pick the first secret available.
+ # param secret_id: The id of the storage secret to be used.
+
self._uuid = uuid
self._passphrase = passphrase
self._secrets_path = secrets_path
self._shared_db = shared_db
self._crypto = crypto
- self._secret_id = secret_id
self._secrets = {}
+ self._secret_id = None
+
def bootstrap(self):
"""
Bootstrap secrets.
@@ -247,7 +249,8 @@ class SoledadSecrets(object):
try:
self._load_secrets() # try to load from disk
except IOError as e:
- logger.warning('IOError while loading secrets from disk: %s' % str(e))
+ logger.warning(
+ 'IOError while loading secrets from disk: %s' % str(e))
return False
return self.storage_secret is not None
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index c9e69c73..a7e9e0fe 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -45,7 +45,6 @@ import logging
import multiprocessing
import os
import threading
-# import time --- needed for the win initialization hack
import json
from hashlib import sha256
@@ -56,7 +55,10 @@ from httplib import CannotSendRequest
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
-from taskthread import TimerTask
+
+from twisted.internet.task import LoopingCall
+from twisted.internet.threads import deferToThreadPool
+from twisted.python.threadpool import ThreadPool
from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
@@ -64,7 +66,6 @@ from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.client.sync import SoledadSynchronizer
# TODO use adbapi too
-from leap.soledad.client.mp_safe_db_TOREMOVE import MPSafeSQLiteDB
from leap.soledad.client import pragmas
from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument
@@ -75,16 +76,6 @@ logger = logging.getLogger(__name__)
# Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2
sqlite_backend.dbapi2 = sqlcipher_dbapi2
-# It seems that, as long as we are not using old sqlite versions, serialized
-# mode is enabled by default at compile time. So accessing db connections from
-# different threads should be safe, as long as no attempt is made to use them
-# from multiple threads with no locking.
-# See https://sqlite.org/threadsafe.html
-# and http://bugs.python.org/issue16509
-
-# TODO this no longer needed -------------
-#SQLITE_CHECK_SAME_THREAD = False
-
def initialize_sqlcipher_db(opts, on_init=None):
"""
@@ -96,12 +87,17 @@ def initialize_sqlcipher_db(opts, on_init=None):
:type on_init: tuple
:return: a SQLCipher connection
"""
- conn = sqlcipher_dbapi2.connect(
- opts.path)
+ # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6)
+ # where without re-opening the database on Windows, it
+ # doesn't see the transaction that was just committed
+ # Removing from here now, look at the pysqlite implementation if the
+ # bug shows up in windows.
- # XXX not needed -- check
- #check_same_thread=SQLITE_CHECK_SAME_THREAD)
+ if not os.path.isfile(opts.path) and not opts.create:
+ raise u1db_errors.DatabaseDoesNotExist()
+ conn = sqlcipher_dbapi2.connect(
+ opts.path)
set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
@@ -196,10 +192,11 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
defer_encryption = False
- # XXX not used afaik:
- # _index_storage_value = 'expand referenced encrypted'
+ # The attribute _index_storage_value will be used as the lookup key.
+ # Here we extend it with `encrypted`
+ _index_storage_value = 'expand referenced encrypted'
- def __init__(self, soledad_crypto, opts):
+ def __init__(self, opts):
"""
Connect to an existing SQLCipher database, creating a new sqlcipher
database file if needed.
@@ -217,18 +214,34 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:param opts:
:type opts: SQLCipherOptions
"""
- # TODO ------ we don't need any soledad crypto in here
-
# ensure the db is encrypted if the file already exists
if os.path.isfile(opts.path):
- self.assert_db_is_encrypted(opts)
+ _assert_db_is_encrypted(opts)
# connect to the sqlcipher database
self._db_handle = initialize_sqlcipher_db(opts)
- self._real_replica_uid = None
- self._ensure_schema()
+ # TODO ---------------------------------------------------
+ # Everything else in this initialization has to be factored
+ # out, so it can be used from U1DBSqlcipherWrapper __init__
+ # too.
+ # ---------------------------------------------------------
+
+ self._ensure_schema()
self.set_document_factory(soledad_doc_factory)
+ self._prime_replica_uid()
+
+ def _prime_replica_uid(self):
+ """
+ In the u1db implementation, _replica_uid is a property
+ that returns the value in _real_replica_uid, and does
+ a db query if no value found.
+ Here we prime the replica uid during initialization so
+ that we don't have to wait for the query afterwards.
+ """
+ self._real_replica_uid = None
+ self._get_replica_uid()
+ print "REPLICA UID --->", self._real_replica_uid
def _extra_schema_init(self, c):
"""
@@ -241,7 +254,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:param c: The cursor for querying the database.
:type c: dbapi2.cursor
"""
- print "CALLING EXTRA SCHEMA INIT...."
c.execute(
'ALTER TABLE document '
'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')
@@ -263,7 +275,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)
- # XXX move to API
+ # TODO XXX move to API XXX
if self.defer_encryption:
self.sync_queue.put_nowait(doc)
return doc_rev
@@ -272,37 +284,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
# SQLCipher API methods
#
- # TODO this doesn't need to be an instance method
- def assert_db_is_encrypted(self, opts):
- """
- Assert that the sqlcipher file contains an encrypted database.
-
- When opening an existing database, PRAGMA key will not immediately
- throw an error if the key provided is incorrect. To test that the
- database can be successfully opened with the provided key, it is
- necessary to perform some operation on the database (i.e. read from
- it) and confirm it is success.
-
- The easiest way to do this is select off the sqlite_master table,
- which will attempt to read the first page of the database and will
- parse the schema.
-
- :param opts:
- """
- # We try to open an encrypted database with the regular u1db
- # backend should raise a DatabaseError exception.
- # If the regular backend succeeds, then we need to stop because
- # the database was not properly initialized.
- try:
- sqlite_backend.SQLitePartialExpandDatabase(opts.path)
- except sqlcipher_dbapi2.DatabaseError:
- # assert that we can access it using SQLCipher with the given
- # key
- dummy_query = ('SELECT count(*) FROM sqlite_master',)
- initialize_sqlcipher_db(opts, on_init=dummy_query)
- else:
- raise DatabaseIsNotEncrypted()
-
# Extra query methods: extensions to the base u1db sqlite implmentation.
def get_count_from_index(self, index_name, *key_values):
@@ -420,65 +401,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
self.close()
- # TODO ---- rescue the fix for the windows case from here...
- # @classmethod
- # def _open_database(cls, sqlcipher_file, password, document_factory=None,
- # crypto=None, raw_key=False, cipher='aes-256-cbc',
- # kdf_iter=4000, cipher_page_size=1024,
- # defer_encryption=False, sync_db_key=None):
- # """
- # Open a SQLCipher database.
-#
- # :return: The database object.
- # :rtype: SQLCipherDatabase
- # """
- # cls.defer_encryption = defer_encryption
- # if not os.path.isfile(sqlcipher_file):
- # raise u1db_errors.DatabaseDoesNotExist()
-#
- # tries = 2
- # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6)
- # where without re-opening the database on Windows, it
- # doesn't see the transaction that was just committed
- # while True:
- # with cls.k_lock:
- # db_handle = dbapi2.connect(
- # sqlcipher_file,
- # check_same_thread=SQLITE_CHECK_SAME_THREAD)
-#
- # try:
- # set cryptographic params
-#
- # XXX pass only a CryptoOptions object around
- #pragmas.set_crypto_pragmas(
- #db_handle, password, raw_key, cipher, kdf_iter,
- #cipher_page_size)
- #c = db_handle.cursor()
- # XXX if we use it here, it should be public
- #v, err = cls._which_index_storage(c)
- #except Exception as exc:
- #logger.warning("ERROR OPENING DATABASE!")
- #logger.debug("error was: %r" % exc)
- #v, err = None, exc
- #finally:
- #db_handle.close()
- #if v is not None:
- #break
- # possibly another process is initializing it, wait for it to be
- # done
- #if tries == 0:
- #raise err # go for the richest error?
- #tries -= 1
- #time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL)
- #return SQLCipherDatabase._sqlite_registry[v](
- #sqlcipher_file, password, document_factory=document_factory,
- #crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter,
- #cipher_page_size=cipher_page_size, sync_db_key=sync_db_key)
-
class SQLCipherU1DBSync(object):
- _sync_watcher = None
+ _sync_loop = None
_sync_enc_pool = None
"""
@@ -495,11 +421,10 @@ class SQLCipherU1DBSync(object):
encrypting_lock = threading.Lock()
"""
- Period or recurrence of the periodic encrypting task, in seconds.
+ Period or recurrence of the Looping Call that will do the encryption to the
+ syncdb (in seconds).
"""
- # XXX use LoopingCall.
- # Just use fucking deferreds, do not waste time looping.
- ENCRYPT_TASK_PERIOD = 1
+ ENCRYPT_LOOP_PERIOD = 1
"""
A dictionary that hold locks which avoid multiple sync attempts from the
@@ -507,39 +432,62 @@ class SQLCipherU1DBSync(object):
"""
syncing_lock = defaultdict(threading.Lock)
- def _init_sync(self, opts, soledad_crypto, defer_encryption=False):
+ def __init__(self, opts, soledad_crypto, replica_uid,
+ defer_encryption=False):
self._crypto = soledad_crypto
-
- # TODO ----- have to decide what to do with syncer
self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_db_write_lock = None
self._sync_enc_pool = None
self.sync_queue = None
- if self.defer_encryption:
- # initialize sync db
- self._init_sync_db()
- # initialize syncing queue encryption pool
- self._sync_enc_pool = crypto.SyncEncrypterPool(
- self._crypto, self._sync_db, self._sync_db_write_lock)
- self._sync_watcher = TimerTask(self._encrypt_syncing_docs,
- self.ENCRYPT_TASK_PERIOD)
- self._sync_watcher.start()
-
- # TODO move to class attribute?
# we store syncers in a dictionary indexed by the target URL. We also
# store a hash of the auth info in case auth info expires and we need
# to rebuild the syncer for that target. The final self._syncers
# format is the following::
#
# self._syncers = {'<url>': ('<auth_hash>', syncer), ...}
+
self._syncers = {}
self._sync_db_write_lock = threading.Lock()
self.sync_queue = multiprocessing.Queue()
- def _init_sync_db(self, opts):
+ self._sync_threadpool = None
+ self._initialize_sync_threadpool()
+
+ if defer_encryption:
+ self._initialize_sync_db()
+
+ # initialize syncing queue encryption pool
+ self._sync_enc_pool = crypto.SyncEncrypterPool(
+ self._crypto, self._sync_db, self._sync_db_write_lock)
+
+ # ------------------------------------------------------------------
+ # From the documentation: If f returns a deferred, rescheduling
+ # will not take place until the deferred has fired. The result
+ # value is ignored.
+
+ # TODO use this to avoid multiple sync attempts if the sync has not
+ # finished!
+ # ------------------------------------------------------------------
+
+ # XXX this was called sync_watcher --- trace any remnants
+ self._sync_loop = LoopingCall(self._encrypt_syncing_docs),
+ self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
+
+ def _initialize_sync_threadpool(self):
+ """
+ Initialize a ThreadPool with exactly one thread, that will be used to
+ run all the network blocking calls for syncing on a separate thread.
+
+ TODO this needs to be ported away from urllib and into twisted async
+ calls, and then we can ditch this syncing thread and reintegrate into
+ the main reactor.
+ """
+ self._sync_threadpool = ThreadPool(0, 1)
+
+ def _initialize_sync_db(self, opts):
"""
Initialize the Symmetrically-Encrypted document to be synced database,
and the queue to communicate with subprocess workers.
@@ -554,29 +502,32 @@ class SQLCipherU1DBSync(object):
else:
sync_db_path = ":memory:"
- # XXX use initialize_sqlcipher_db here too
- # TODO pass on_init queries to initialize_sqlcipher_db
- self._sync_db = MPSafeSQLiteDB(sync_db_path)
- pragmas.set_crypto_pragmas(self._sync_db, opts)
+ # ---------------------------------------------------------
+ # TODO use a separate adbapi for this (sqlcipher only, no u1db)
+ # We could control that it only has 1 or 2 threads.
- # create sync tables
- self._create_sync_db_tables()
+ opts.path = sync_db_path
- def _create_sync_db_tables(self):
+ self._sync_db = initialize_sqlcipher_db(
+ opts, on_init=self._sync_db_extra_init)
+ # ---------------------------------------------------------
+
+ @property
+ def _sync_db_extra_init(self):
"""
- Create tables for the local sync documents db if needed.
+ Queries for creating tables for the local sync documents db if needed.
+ They are passed as extra initialization to initialize_sqlciphjer_db
+
+ :rtype: tuple of strings
"""
- # TODO use adbapi ---------------------------------
+ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
encr = crypto.SyncEncrypterPool
decr = crypto.SyncDecrypterPool
- sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ sql_encr_table_query = (maybe_create % (
encr.TABLE_NAME, encr.FIELD_NAMES))
- sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % (
+ sql_decr_table_query = (maybe_create % (
decr.TABLE_NAME, decr.FIELD_NAMES))
-
- with self._sync_db_write_lock:
- self._sync_db.execute(sql_encr)
- self._sync_db.execute(sql_decr)
+ return (sql_encr_table_query, sql_decr_table_query)
def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
"""
@@ -599,15 +550,24 @@ class SQLCipherU1DBSync(object):
database. If False, decryption will be done inline.
:type defer_decryption: bool
- :return: The local generation before the synchronisation was performed.
- :rtype: int
+ :return:
+ A Deferred, that will fire with the local generation (type `int`)
+ before the synchronisation was performed.
+ :rtype: deferred
"""
+ kwargs = {'creds': creds, 'autocreate': autocreate,
+ 'defer_decryption': defer_decryption}
+ return deferToThreadPool(self._sync, url, **kwargs)
+
+ def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
res = None
+
# the following context manager blocks until the syncing lock can be
# acquired.
- if defer_decryption:
- self._init_sync_db()
- with self.syncer(url, creds=creds) as syncer:
+ # TODO review, I think this is no longer needed with a 1-thread
+ # threadpool.
+
+ with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
try:
res = syncer.sync(autocreate=autocreate,
@@ -634,7 +594,7 @@ class SQLCipherU1DBSync(object):
syncer.stop()
@contextmanager
- def syncer(self, url, creds=None):
+ def _syncer(self, url, creds=None):
"""
Accesor for synchronizer.
@@ -643,13 +603,13 @@ class SQLCipherU1DBSync(object):
Because of that, this method blocks until the syncing lock can be
acquired.
"""
- with self.syncing_lock[self._get_replica_uid()]:
+ with self.syncing_lock[self.replica_uid]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
@property
def syncing(self):
- lock = self.syncing_lock[self._get_replica_uid()]
+ lock = self.syncing_lock[self.replica_uid]
acquired_lock = lock.acquire(False)
if acquired_lock is False:
return True
@@ -679,7 +639,7 @@ class SQLCipherU1DBSync(object):
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
- self._replica_uid,
+ self.replica_uid,
creds=creds,
crypto=self._crypto,
sync_db=self._sync_db,
@@ -701,8 +661,11 @@ class SQLCipherU1DBSync(object):
to be encrypted in the sync db. They will be read by the
SoledadSyncTarget during the sync_exchange.
- Called periodical from the TimerTask self._sync_watcher.
+ Called periodically from the LoopingCall self._sync_loop.
"""
+ # TODO should return a deferred that would firewhen the encryption is
+ # done. See note on __init__
+
lock = self.encrypting_lock
# optional wait flag used to avoid blocking
if not lock.acquire(False):
@@ -720,19 +683,19 @@ class SQLCipherU1DBSync(object):
finally:
lock.release()
- @property
- def replica_uid(self):
- return self._get_replica_uid()
+ def get_generation(self):
+ # FIXME
+ # XXX this SHOULD BE a callback
+ return self._get_generation()
def close(self):
"""
Close the syncer and syncdb orderly
"""
- # stop the sync watcher for deferred encryption
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- self._sync_watcher = None
+ # stop the sync loop for deferred encryption
+ if self._sync_loop is not None:
+ self._sync_loop.stop()
+ self._sync_loop = None
# close all open syncers
for url in self._syncers:
_, syncer = self._syncers[url]
@@ -753,6 +716,37 @@ class SQLCipherU1DBSync(object):
del self.sync_queue
self.sync_queue = None
+
+def _assert_db_is_encrypted(opts):
+ """
+ Assert that the sqlcipher file contains an encrypted database.
+
+ When opening an existing database, PRAGMA key will not immediately
+ throw an error if the key provided is incorrect. To test that the
+ database can be successfully opened with the provided key, it is
+ necessary to perform some operation on the database (i.e. read from
+ it) and confirm it is success.
+
+ The easiest way to do this is select off the sqlite_master table,
+ which will attempt to read the first page of the database and will
+ parse the schema.
+
+ :param opts:
+ """
+ # We try to open an encrypted database with the regular u1db
+ # backend should raise a DatabaseError exception.
+ # If the regular backend succeeds, then we need to stop because
+ # the database was not properly initialized.
+ try:
+ sqlite_backend.SQLitePartialExpandDatabase(opts.path)
+ except sqlcipher_dbapi2.DatabaseError:
+ # assert that we can access it using SQLCipher with the given
+ # key
+ dummy_query = ('SELECT count(*) FROM sqlite_master',)
+ initialize_sqlcipher_db(opts, on_init=dummy_query)
+ else:
+ raise DatabaseIsNotEncrypted()
+
#
# Exceptions
#
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index a47afbb6..aa19ddab 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -17,10 +17,9 @@
"""
Soledad synchronization utilities.
-
Extend u1db Synchronizer with the ability to:
- * Defer the update of the known replica uid until all the decryption of
+ * Postpone the update of the known replica uid until all the decryption of
the incoming messages has been processed.
* Be interrupted and recovered.
@@ -48,6 +47,8 @@ class SoledadSynchronizer(Synchronizer):
Also modified to allow for interrupting the synchronization process.
"""
+ # TODO can delegate the syncing to the api object, living in the reactor
+ # thread, and use a simple flag.
syncing_lock = Lock()
def stop(self):
@@ -232,6 +233,8 @@ class SoledadSynchronizer(Synchronizer):
# release if something in the syncdb-decrypt goes wrong. we could keep
# track of the release date and cleanup unrealistic sync entries after
# some time.
+
+ # TODO use cancellable deferreds instead
locked = self.syncing_lock.locked()
return locked
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 651d3ee5..9b546402 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -14,14 +14,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
A U1DB backend for encrypting data before sending to server and decrypting
after receiving.
"""
-
-
import cStringIO
import gzip
import logging
@@ -34,7 +30,7 @@ from time import sleep
from uuid import uuid4
import simplejson as json
-from taskthread import TimerTask
+
from u1db import errors
from u1db.remote import utils, http_errors
from u1db.remote.http_target import HTTPSyncTarget
@@ -42,6 +38,8 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import sameProxiedObjects, setProxiedObject
+from twisted.internet.task import LoopingCall
+
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
@@ -755,7 +753,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Period of recurrence of the periodic decrypting task, in seconds.
"""
- DECRYPT_TASK_PERIOD = 0.5
+ DECRYPT_LOOP_PERIOD = 0.5
#
# Modified HTTPSyncTarget methods.
@@ -802,7 +800,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_db_write_lock = None
self._decryption_callback = None
self._sync_decr_pool = None
- self._sync_watcher = None
+ self._sync_loop = None
if sync_db and sync_db_write_lock is not None:
self._sync_db = sync_db
self._sync_db_write_lock = sync_db_write_lock
@@ -828,23 +826,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._sync_decr_pool.close()
self._sync_decr_pool = None
- def _setup_sync_watcher(self):
+ def _setup_sync_loop(self):
"""
- Set up the sync watcher for deferred decryption.
+ Set up the sync loop for deferred decryption.
"""
- if self._sync_watcher is None:
- self._sync_watcher = TimerTask(
- self._decrypt_syncing_received_docs,
- delay=self.DECRYPT_TASK_PERIOD)
+ if self._sync_loop is None:
+ self._sync_loop = LoopingCall(
+ self._decrypt_syncing_received_docs)
+ self._sync_loop.start(self.DECRYPT_LOOP_PERIOD)
- def _teardown_sync_watcher(self):
+ def _teardown_sync_loop(self):
"""
- Tear down the sync watcher.
+ Tear down the sync loop.
"""
- if self._sync_watcher is not None:
- self._sync_watcher.stop()
- self._sync_watcher.shutdown()
- self._sync_watcher = None
+ if self._sync_loop is not None:
+ self._sync_loop.stop()
+ self._sync_loop = None
def _get_replica_uid(self, url):
"""
@@ -1131,7 +1128,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if defer_decryption and self._sync_db is not None:
self._sync_exchange_lock.acquire()
self._setup_sync_decr_pool()
- self._setup_sync_watcher()
+ self._setup_sync_loop()
self._defer_decryption = True
else:
# fall back
@@ -1292,10 +1289,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# decrypt docs in case of deferred decryption
if defer_decryption:
- self._sync_watcher.start()
+ self._sync_loop.start()
while self.clear_to_sync() is False:
- sleep(self.DECRYPT_TASK_PERIOD)
- self._teardown_sync_watcher()
+ sleep(self.DECRYPT_LOOP_PERIOD)
+ self._teardown_sync_loop()
self._teardown_sync_decr_pool()
self._sync_exchange_lock.release()
@@ -1460,7 +1457,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
Decrypt the documents received from remote replica and insert them
into the local one.
- Called periodically from TimerTask self._sync_watcher.
+ Called periodically from LoopingCall self._sync_loop.
"""
if sameProxiedObjects(
self._insert_doc_cb.get(self.source_replica_uid),