From 8f4daa13744c049dcc96eb2cb780df1e9ba08738 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 2 Oct 2014 06:17:57 -0500 Subject: Separate soledad interfaces * Separate local storage, syncers and shared_db * Comment out unused need_sync method * Use twisted LoopingCall * Create a threadpool for syncs * Return deferred from sync method * Do not pass crypto to SQLCipherDatabase * Pass replica_uid to u1db_syncer * Rename / reorganize some initialization methods --- client/src/leap/soledad/client/adbapi.py | 28 +- client/src/leap/soledad/client/api.py | 613 ++++++--------------- client/src/leap/soledad/client/examples/use_api.py | 2 +- client/src/leap/soledad/client/interfaces.py | 361 ++++++++++++ client/src/leap/soledad/client/pragmas.py | 13 +- client/src/leap/soledad/client/secrets.py | 15 +- client/src/leap/soledad/client/sqlcipher.py | 326 ++++++----- client/src/leap/soledad/client/sync.py | 7 +- client/src/leap/soledad/client/target.py | 45 +- 9 files changed, 755 insertions(+), 655 deletions(-) create mode 100644 client/src/leap/soledad/client/interfaces.py (limited to 'client/src') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 3b15509b..60d9e195 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -30,7 +30,7 @@ from u1db.backends import sqlite_backend from twisted.enterprise import adbapi from twisted.python import log -from leap.soledad.client.sqlcipher import set_init_pragmas +from leap.soledad.client import sqlcipher as soledad_sqlcipher DEBUG_SQL = os.environ.get("LEAP_DEBUG_SQL") @@ -40,18 +40,15 @@ if DEBUG_SQL: def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): if openfun is None and driver == "pysqlcipher": - openfun = partial(set_init_pragmas, opts=opts) + openfun = partial(soledad_sqlcipher.set_init_pragmas, opts=opts) return U1DBConnectionPool( "%s.dbapi2" % driver, database=opts.path, check_same_thread=False, cp_openfun=openfun) -# XXX work in progress -------------------------------------------- - - -class U1DBSqliteWrapper(sqlite_backend.SQLitePartialExpandDatabase): +class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): """ - A very simple wrapper around sqlcipher backend. + A very simple wrapper for u1db around sqlcipher backend. Instead of initializing the database on the fly, it just uses an existing connection that is passed to it in the initializer. @@ -64,9 +61,24 @@ class U1DBSqliteWrapper(sqlite_backend.SQLitePartialExpandDatabase): self._factory = u1db.Document +class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase): + """ + A wrapper for u1db that uses the Soledad-extended sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_sqlcipher.soledad_doc_factory) + self._prime_replica_uid() + + class U1DBConnection(adbapi.Connection): - u1db_wrapper = U1DBSqliteWrapper + u1db_wrapper = SoledadSQLCipherWrapper def __init__(self, pool, init_u1db=False): self.init_u1db = init_u1db diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 703b9516..493f6c1d 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -41,6 +41,9 @@ except ImportError: from u1db.remote import http_client from u1db.remote.ssl_match_hostname import match_hostname +from zope.interface import implements + +from twisted.python import log from leap.common.config import get_path_prefix from leap.soledad.common import SHARED_DB_NAME @@ -49,11 +52,11 @@ from leap.soledad.common import soledad_assert_type from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events +from leap.soledad.client import interfaces as soledad_interfaces from leap.soledad.client.crypto import SoledadCrypto from leap.soledad.client.secrets import SoledadSecrets from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.sqlcipher import SQLCipherOptions +from leap.soledad.client.sqlcipher import SQLCipherOptions, SQLCipherU1DBSync logger = logging.getLogger(name=__name__) @@ -61,17 +64,13 @@ logger = logging.getLogger(name=__name__) # Constants # -SOLEDAD_CERT = None """ Path to the certificate file used to certify the SSL connection between Soledad client and server. """ +SOLEDAD_CERT = None -# -# Soledad: local encrypted storage and remote encrypted sync. -# - class Soledad(object): """ Soledad provides encrypted data storage and sync. @@ -104,65 +103,57 @@ class Soledad(object): SOLEDAD_DONE_DATA_SYNC: emitted inside C{sync()} method when it has finished synchronizing with remote replica. """ + implements(soledad_interfaces.ILocalStorage, + soledad_interfaces.ISyncableStorage, + soledad_interfaces.ISharedSecretsStorage) - LOCAL_DATABASE_FILE_NAME = 'soledad.u1db' - """ - The name of the local SQLCipher U1DB database file. - """ - - STORAGE_SECRETS_FILE_NAME = "soledad.json" - """ - The name of the file where the storage secrets will be stored. - """ - - DEFAULT_PREFIX = os.path.join(get_path_prefix(), 'leap', 'soledad') - """ - Prefix for default values for path. - """ + local_db_file_name = 'soledad.u1db' + secrets_file_name = "soledad.json" + default_prefix = os.path.join(get_path_prefix(), 'leap', 'soledad') def __init__(self, uuid, passphrase, secrets_path, local_db_path, server_url, cert_file, - auth_token=None, secret_id=None, defer_encryption=False): + auth_token=None, defer_encryption=False): """ Initialize configuration, cryptographic keys and dbs. :param uuid: User's uuid. :type uuid: str - :param passphrase: The passphrase for locking and unlocking encryption - secrets for local and remote storage. + :param passphrase: + The passphrase for locking and unlocking encryption secrets for + local and remote storage. :type passphrase: unicode - :param secrets_path: Path for storing encrypted key used for - symmetric encryption. + :param secrets_path: + Path for storing encrypted key used for symmetric encryption. :type secrets_path: str :param local_db_path: Path for local encrypted storage db. :type local_db_path: str - :param server_url: URL for Soledad server. This is used either to sync - with the user's remote db and to interact with the - shared recovery database. + :param server_url: + URL for Soledad server. This is used either to sync with the user's + remote db and to interact with the shared recovery database. :type server_url: str - :param cert_file: Path to the certificate of the ca used - to validate the SSL certificate used by the remote - soledad server. + :param cert_file: + Path to the certificate of the ca used to validate the SSL + certificate used by the remote soledad server. :type cert_file: str - :param auth_token: Authorization token for accessing remote databases. + :param auth_token: + Authorization token for accessing remote databases. :type auth_token: str - :param secret_id: The id of the storage secret to be used. - :type secret_id: str - - :param defer_encryption: Whether to defer encryption/decryption of - documents, or do it inline while syncing. + :param defer_encryption: + Whether to defer encryption/decryption of documents, or do it + inline while syncing. :type defer_encryption: bool - :raise BootstrapSequenceError: Raised when the secret generation and - storage on server sequence has failed - for some reason. + :raise BootstrapSequenceError: + Raised when the secret generation and storage on server sequence + has failed for some reason. """ # store config params self._uuid = uuid @@ -170,30 +161,34 @@ class Soledad(object): self._secrets_path = secrets_path self._local_db_path = local_db_path self._server_url = server_url + self._defer_encryption = defer_encryption + + self.shared_db = None + # configure SSL certificate global SOLEDAD_CERT SOLEDAD_CERT = cert_file - self._set_token(auth_token) - self._defer_encryption = defer_encryption - - self._init_config() - self._init_dirs() # init crypto variables - self._shared_db_instance = None + self._set_token(auth_token) self._crypto = SoledadCrypto(self) - self._secrets = SoledadSecrets( - self._uuid, - self._passphrase, - self._secrets_path, - self._shared_db, - self._crypto, - secret_id=secret_id) - # initiate bootstrap sequence - self._bootstrap() # might raise BootstrapSequenceError() + self._init_config_with_defaults() + self._init_working_dirs() + + # Initialize shared recovery database + self.init_shared_db(server_url, uuid, self._creds) - def _init_config(self): + # The following can raise BootstrapSequenceError, that will be + # propagated upwards. + self._init_secrets() + self._init_u1db_sqlcipher_backend() + self._init_u1db_syncer() + + # + # initialization/destruction methods + # + def _init_config_with_defaults(self): """ Initialize configuration using default values for missing params. """ @@ -202,55 +197,37 @@ class Soledad(object): # initialize secrets_path initialize(self._secrets_path, os.path.join( - self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME)) - + self.default_prefix, self.secrets_file_name)) # initialize local_db_path initialize(self._local_db_path, os.path.join( - self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME)) - + self.default_prefix, self.local_db_file_name)) # initialize server_url soledad_assert(self._server_url is not None, 'Missing URL for Soledad server.') - # - # initialization/destruction methods - # - - def _bootstrap(self): - """ - Bootstrap local Soledad instance. - - :raise BootstrapSequenceError: - Raised when the secret generation and storage on server sequence - has failed for some reason. - """ - self._secrets.bootstrap() - self._init_db() - # XXX initialize syncers? - - def _init_dirs(self): + def _init_working_dirs(self): """ Create work directories. :raise OSError: in case file exists and is not a dir. """ - paths = map( - lambda x: os.path.dirname(x), - [self._local_db_path, self._secrets_path]) + paths = map(lambda x: os.path.dirname(x), [ + self._local_db_path, self._secrets_path]) for path in paths: - try: - if not os.path.isdir(path): - logger.info('Creating directory: %s.' % path) - os.makedirs(path) - except OSError as exc: - if exc.errno == errno.EEXIST and os.path.isdir(path): - pass - else: - raise - - def _init_db(self): + create_path_if_not_exists(path) + + def _init_secrets(self): + self._secrets = SoledadSecrets( + self.uuid, self.passphrase, self.secrets_path, + self._shared_db, self._crypto) + self._secrets.bootstrap() + + def _init_u1db_sqlcipher_backend(self): """ - Initialize the U1DB SQLCipher database for local storage. + Initialize the U1DB SQLCipher database for local storage, by + instantiating a modified twisted adbapi that will maintain a threadpool + with a u1db-sqclipher connection for each thread, and will return + deferreds for each u1db query. Currently, Soledad uses the default SQLCipher cipher, i.e. 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key, @@ -268,8 +245,17 @@ class Soledad(object): defer_encryption=self._defer_encryption, sync_db_key=sync_db_key, ) + self._soledad_opts = opts self._dbpool = adbapi.getConnectionPool(opts) + def _init_u1db_syncer(self): + self._dbsyncer = SQLCipherU1DBSync( + self._soledad_opts, self._crypto, self._defer_encryption) + + # + # Closing methods + # + def close(self): """ Close underlying U1DB database. @@ -279,401 +265,164 @@ class Soledad(object): # TODO close syncers >>>>>> - #if hasattr(self, '_db') and isinstance( - #self._db, - #SQLCipherDatabase): - #self._db.close() -# - # XXX stop syncers - # self._db.stop_sync() - - @property - def _shared_db(self): - """ - Return an instance of the shared recovery database object. - - :return: The shared database. - :rtype: SoledadSharedDatabase - """ - if self._shared_db_instance is None: - self._shared_db_instance = SoledadSharedDatabase.open_database( - urlparse.urljoin(self.server_url, SHARED_DB_NAME), - self._uuid, - False, # db should exist at this point. - creds=self._creds) - return self._shared_db_instance - # - # Document storage, retrieval and sync. + # ILocalStorage # def put_doc(self, doc): - # TODO what happens with this warning during the deferred life cycle? - # Isn't it better to defend ourselves from the mutability, to avoid - # nasty surprises? """ - Update a document in the local encrypted database. - ============================== WARNING ============================== This method converts the document's contents to unicode in-place. This means that after calling `put_doc(doc)`, the contents of the document, i.e. `doc.content`, might be different from before the call. ============================== WARNING ============================== - - :param doc: the document to update - :type doc: SoledadDocument - - :return: - a deferred that will fire with the new revision identifier for - the document - :rtype: Deferred """ + # TODO what happens with this warning during the deferred life cycle? + # Isn't it better to defend ourselves from the mutability, to avoid + # nasty surprises? doc.content = self._convert_to_unicode(doc.content) return self._dbpool.put_doc(doc) def delete_doc(self, doc): - """ - Delete a document from the local encrypted database. - - :param doc: the document to delete - :type doc: SoledadDocument - - :return: - a deferred that will fire with ... - :rtype: Deferred - """ # XXX what does this do when fired??? return self._dbpool.delete_doc(doc) def get_doc(self, doc_id, include_deleted=False): - """ - Retrieve a document from the local encrypted database. - - :param doc_id: the unique document identifier - :type doc_id: str - :param include_deleted: - if True, deleted documents will be returned with empty content; - otherwise asking for a deleted document will return None - :type include_deleted: bool - - :return: - A deferred that will fire with the document object, containing a - SoledadDocument, or None if it could not be found - :rtype: Deferred - """ return self._dbpool.get_doc(doc_id, include_deleted=include_deleted) def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): - """ - Get the content for many documents. - - :param doc_ids: a list of document identifiers - :type doc_ids: list - :param check_for_conflicts: if set False, then the conflict check will - be skipped, and 'None' will be returned instead of True/False - :type check_for_conflicts: bool - - :return: - A deferred that will fire with an iterable giving the Document - object for each document id in matching doc_ids order. - :rtype: Deferred - """ - return self._dbpool.get_docs( - doc_ids, check_for_conflicts=check_for_conflicts, - include_deleted=include_deleted) + return self._dbpool.get_docs(doc_ids, + check_for_conflicts=check_for_conflicts, + include_deleted=include_deleted) def get_all_docs(self, include_deleted=False): - """ - Get the JSON content for all documents in the database. - - :param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise deleted - documents will not be included in the results. - :return: - A deferred that will fire with (generation, [Document]): that is, - the current generation of the database, followed by a list of all - the documents in the database. - :rtype: Deferred - """ return self._dbpool.get_all_docs(include_deleted) def create_doc(self, content, doc_id=None): - """ - Create a new document in the local encrypted database. - - :param content: the contents of the new document - :type content: dict - :param doc_id: an optional identifier specifying the document id - :type doc_id: str - - :return: - A deferred tht will fire with the new document (SoledadDocument - instance). - :rtype: Deferred - """ return self._dbpool.create_doc( _convert_to_unicode(content), doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): - """ - Create a new document. - - You can optionally specify the document identifier, but the document - must not already exist. See 'put_doc' if you want to override an - existing document. - If the database specifies a maximum document size and the document - exceeds it, create will fail and raise a DocumentTooBig exception. - - :param json: The JSON document string - :type json: str - :param doc_id: An optional identifier specifying the document id. - :type doc_id: - :return: - A deferred that will fire with the new document (A SoledadDocument - instance) - :rtype: Deferred - """ return self._dbpool.create_doc_from_json(json, doc_id=doc_id) def create_index(self, index_name, *index_expressions): - """ - Create an named index, which can then be queried for future lookups. - Creating an index which already exists is not an error, and is cheap. - Creating an index which does not match the index_expressions of the - existing index is an error. - Creating an index will block until the expressions have been evaluated - and the index generated. - - :param index_name: A unique name which can be used as a key prefix - :type index_name: str - :param index_expressions: - index expressions defining the index information. - :type index_expressions: dict - - Examples: - - "fieldname", or "fieldname.subfieldname" to index alphabetically - sorted on the contents of a field. - - "number(fieldname, width)", "lower(fieldname)" - """ return self._dbpool.create_index(index_name, *index_expressions) def delete_index(self, index_name): - """ - Remove a named index. - - :param index_name: The name of the index we are removing - :type index_name: str - """ return self._dbpool.delete_index(index_name) def list_indexes(self): - """ - List the definitions of all known indexes. - - :return: A list of [('index-name', ['field', 'field2'])] definitions. - :rtype: list - """ return self._dbpool.list_indexes() def get_from_index(self, index_name, *key_values): - """ - Return documents that match the keys supplied. - - You must supply exactly the same number of values as have been defined - in the index. It is possible to do a prefix match by using '*' to - indicate a wildcard match. You can only supply '*' to trailing entries, - (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) - It is also possible to append a '*' to the last supplied value (eg - 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') - - :param index_name: The index to query - :type index_name: str - :param key_values: values to match. eg, if you have - an index with 3 fields then you would have: - get_from_index(index_name, val1, val2, val3) - :type key_values: tuple - :return: List of [Document] - :rtype: list - """ return self._dbpool.get_from_index(index_name, *key_values) def get_count_from_index(self, index_name, *key_values): - """ - Return the count of the documents that match the keys and - values supplied. - - :param index_name: The index to query - :type index_name: str - :param key_values: values to match. eg, if you have - an index with 3 fields then you would have: - get_from_index(index_name, val1, val2, val3) - :type key_values: tuple - :return: count. - :rtype: int - """ return self._dbpool.get_count_from_index(index_name, *key_values) def get_range_from_index(self, index_name, start_value, end_value): - """ - Return documents that fall within the specified range. - - Both ends of the range are inclusive. For both start_value and - end_value, one must supply exactly the same number of values as have - been defined in the index, or pass None. In case of a single column - index, a string is accepted as an alternative for a tuple with a single - value. It is possible to do a prefix match by using '*' to indicate - a wildcard match. You can only supply '*' to trailing entries, (eg - 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also - possible to append a '*' to the last supplied value (eg 'val*', '*', - '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') - - :param index_name: The index to query - :type index_name: str - :param start_values: tuples of values that define the lower bound of - the range. eg, if you have an index with 3 fields then you would - have: (val1, val2, val3) - :type start_values: tuple - :param end_values: tuples of values that define the upper bound of the - range. eg, if you have an index with 3 fields then you would have: - (val1, val2, val3) - :type end_values: tuple - :return: A deferred that will fire with a list of [Document] - :rtype: Deferred - """ return self._dbpool.get_range_from_index( index_name, start_value, end_value) def get_index_keys(self, index_name): - """ - Return all keys under which documents are indexed in this index. - - :param index_name: The index to query - :type index_name: str - :return: - A deferred that will fire with a list of tuples of indexed keys. - :rtype: Deferred - """ return self._dbpool.get_index_keys(index_name) def get_doc_conflicts(self, doc_id): - """ - Get the list of conflicts for the given document. - - :param doc_id: the document id - :type doc_id: str - - :return: - A deferred that will fire with a list of the document entries that - are conflicted. - :rtype: Deferred - """ return self._dbpool.get_doc_conflicts(doc_id) def resolve_doc(self, doc, conflicted_doc_revs): - """ - Mark a document as no longer conflicted. - - :param doc: a document with the new content to be inserted. - :type doc: SoledadDocument - :param conflicted_doc_revs: - A deferred that will fire with a list of revisions that the new - content supersedes. - :type conflicted_doc_revs: list - """ return self._dbpool.resolve_doc(doc, conflicted_doc_revs) + def _get_local_db_path(self): + return self._local_db_path + + # XXX Do we really need all this private / property dance? + + local_db_path = property( + _get_local_db_path, + doc='The path for the local database replica.') + + def _get_uuid(self): + return self._uuid + + uuid = property(_get_uuid, doc='The user uuid.') + # - # Sync API + # ISyncableStorage # - # TODO have interfaces, and let it implement it. - def sync(self, defer_decryption=True): - """ - Synchronize the local encrypted replica with a remote replica. - This method blocks until a syncing lock is acquired, so there are no - attempts of concurrent syncs from the same client replica. + # ----------------------------------------------------------------- + # TODO this needs work. + # Should review/write tests to check that this: - :param url: the url of the target replica to sync with - :type url: str + # (1) Defer to the syncer pool -- DONE (on dbsyncer) + # (2) Return the deferred + # (3) Add the callback for signaling the event (executed on reactor + # thread) + # (4) Check that the deferred is called with the local gen. - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool + # TODO document that this returns a deferred + # ----------------------------------------------------------------- - :return: - A deferred that will fire with the local generation before the - synchronisation was performed. - :rtype: str - """ - # TODO this needs work. - # Should: - # (1) Defer to the syncer pool - # (2) Return a deferred (the deferToThreadpool can be good) - # (3) Add the callback for signaling the event - # (4) Let the local gen be returned from the thread + def on_sync_done(local_gen): + soledad_events.signal( + soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) + return local_gen + + sync_url = urlparse.urljoin(self.server_url, 'user-%s' % self.uuid) try: - local_gen = self._dbsyncer.sync( - urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), + d = self._dbsyncer.sync( + sync_url, creds=self._creds, autocreate=False, defer_decryption=defer_decryption) - soledad_events.signal( - soledad_events.SOLEDAD_DONE_DATA_SYNC, self._uuid) - return local_gen + + d.addCallbacks(on_sync_done, lambda err: log.err(err)) + return d + + # TODO catch the exception by adding an Errback except Exception as e: logger.error("Soledad exception when syncing: %s" % str(e)) def stop_sync(self): - """ - Stop the current syncing process. - """ self._dbsyncer.stop_sync() - def need_sync(self, url): - """ - Return if local db replica differs from remote url's replica. - - :param url: The remote replica to compare with local replica. - :type url: str - - :return: Whether remote replica and local replica differ. - :rtype: bool - """ - # XXX pass the get_replica_uid ------------------------ - # From where? initialize with that? - replica_uid = self._db._get_replica_uid() - target = SoledadSyncTarget( - url, replica_uid, creds=self._creds, crypto=self._crypto) - - generation = self._db._get_generation() + # FIXME ------------------------------------------------------- + # review if we really need this. I think that we can the sync + # fail silently if nothing is to be synced. + #def need_sync(self, url): + # XXX dispatch this method in the dbpool ................. + #replica_uid = self._dbpool.replica_uid + #target = SoledadSyncTarget( + #url, replica_uid, creds=self._creds, crypto=self._crypto) +# + # XXX does it matter if we get this from the general dbpool or the + # syncer pool? + #generation = self._dbpool.get_generation() +# # XXX better unpack it? - info = target.get_sync_info(replica_uid) - + #info = target.get_sync_info(replica_uid) +# # compare source generation with target's last known source generation - if generation != info[4]: - soledad_events.signal( - soledad_events.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid) - return True - return False + #if generation != info[4]: + #soledad_events.signal( + #soledad_events.SOLEDAD_NEW_DATA_TO_SYNC, self.uuid) + #return True + #return False @property def syncing(self): - """ - Property, True if the syncer is syncing. - """ return self._dbsyncer.syncing def _set_token(self, token): """ Set the authentication token for remote database access. - Build the credentials dictionary with the following format: + Internally, this builds the credentials dictionary with the following + format: self._{ 'token': { @@ -686,7 +435,7 @@ class Soledad(object): """ self._creds = { 'token': { - 'uuid': self._uuid, + 'uuid': self.uuid, 'token': token, } } @@ -699,25 +448,24 @@ class Soledad(object): token = property(_get_token, _set_token, doc='The authentication Token.') - # - # Setters/getters - # - - def _get_uuid(self): - return self._uuid - - uuid = property(_get_uuid, doc='The user uuid.') + def _get_server_url(self): + return self._server_url - def get_secret_id(self): - return self._secrets.secret_id + server_url = property( + _get_server_url, + doc='The URL of the Soledad server.') - def set_secret_id(self, secret_id): - self._secrets.set_secret_id(secret_id) + # + # ISharedSecretsStorage + # - secret_id = property( - get_secret_id, - set_secret_id, - doc='The active secret id.') + def init_shared_db(self, server_url, uuid, creds): + shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME) + self.shared_db = SoledadSharedDatabase.open_database( + shared_db_url, + uuid, + creds=creds, + create=False) # db should exist at this point. def _set_secrets_path(self, secrets_path): self._secrets.secrets_path = secrets_path @@ -730,20 +478,6 @@ class Soledad(object): _set_secrets_path, doc='The path for the file containing the encrypted symmetric secret.') - def _get_local_db_path(self): - return self._local_db_path - - local_db_path = property( - _get_local_db_path, - doc='The path for the local database replica.') - - def _get_server_url(self): - return self._server_url - - server_url = property( - _get_server_url, - doc='The URL of the Soledad server.') - @property def storage_secret(self): """ @@ -762,19 +496,7 @@ class Soledad(object): def secrets(self): return self._secrets - @property - def passphrase(self): - return self._secrets.passphrase - def change_passphrase(self, new_passphrase): - """ - Change the passphrase that encrypts the storage secret. - - :param new_passphrase: The new passphrase. - :type new_passphrase: unicode - - :raise NoStorageSecret: Raised if there's no storage secret available. - """ self._secrets.change_passphrase(new_passphrase) @@ -811,6 +533,17 @@ def _convert_to_unicode(content): return content +def create_path_if_not_exists(path): + try: + if not os.path.isdir(path): + logger.info('Creating directory: %s.' % path) + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + # ---------------------------------------------------------------------------- # Monkey patching u1db to be able to provide a custom SSL cert # ---------------------------------------------------------------------------- diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py index fd0a100c..4268fe71 100644 --- a/client/src/leap/soledad/client/examples/use_api.py +++ b/client/src/leap/soledad/client/examples/use_api.py @@ -46,7 +46,7 @@ if os.path.isfile(tmpdb): start_time = datetime.datetime.now() opts = SQLCipherOptions(tmpdb, "secret", create=True) -db = sqlcipher.SQLCipherDatabase(None, opts) +db = sqlcipher.SQLCipherDatabase(opts) def allDone(): diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py new file mode 100644 index 00000000..6bd3f200 --- /dev/null +++ b/client/src/leap/soledad/client/interfaces.py @@ -0,0 +1,361 @@ +# -*- coding: utf-8 -*- +# interfaces.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +Interfaces used by the Soledad Client. +""" +from zope.interface import Interface, Attribute + + +class ILocalStorage(Interface): + """ + I implement core methods for the u1db local storage. + """ + local_db_path = Attribute( + "The path for the local database replica") + local_db_file_name = Attribute( + "The name of the local SQLCipher U1DB database file") + uuid = Attribute("The user uuid") + default_prefix = Attribute( + "Prefix for default values for path") + + def put_doc(self, doc): + """ + Update a document in the local encrypted database. + + :param doc: the document to update + :type doc: SoledadDocument + + :return: + a deferred that will fire with the new revision identifier for + the document + :rtype: Deferred + """ + + def delete_doc(self, doc): + """ + Delete a document from the local encrypted database. + + :param doc: the document to delete + :type doc: SoledadDocument + + :return: + a deferred that will fire with ... + :rtype: Deferred + """ + + def get_doc(self, doc_id, include_deleted=False): + """ + Retrieve a document from the local encrypted database. + + :param doc_id: the unique document identifier + :type doc_id: str + :param include_deleted: + if True, deleted documents will be returned with empty content; + otherwise asking for a deleted document will return None + :type include_deleted: bool + + :return: + A deferred that will fire with the document object, containing a + SoledadDocument, or None if it could not be found + :rtype: Deferred + """ + + def get_docs(self, doc_ids, check_for_conflicts=True, + include_deleted=False): + """ + Get the content for many documents. + + :param doc_ids: a list of document identifiers + :type doc_ids: list + :param check_for_conflicts: if set False, then the conflict check will + be skipped, and 'None' will be returned instead of True/False + :type check_for_conflicts: bool + + :return: + A deferred that will fire with an iterable giving the Document + object for each document id in matching doc_ids order. + :rtype: Deferred + """ + + def get_all_docs(self, include_deleted=False): + """ + Get the JSON content for all documents in the database. + + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise deleted + documents will not be included in the results. + :return: + A deferred that will fire with (generation, [Document]): that is, + the current generation of the database, followed by a list of all + the documents in the database. + :rtype: Deferred + """ + + def create_doc(self, content, doc_id=None): + """ + Create a new document in the local encrypted database. + + :param content: the contents of the new document + :type content: dict + :param doc_id: an optional identifier specifying the document id + :type doc_id: str + + :return: + A deferred tht will fire with the new document (SoledadDocument + instance). + :rtype: Deferred + """ + + def create_doc_from_json(self, json, doc_id=None): + """ + Create a new document. + + You can optionally specify the document identifier, but the document + must not already exist. See 'put_doc' if you want to override an + existing document. + If the database specifies a maximum document size and the document + exceeds it, create will fail and raise a DocumentTooBig exception. + + :param json: The JSON document string + :type json: str + :param doc_id: An optional identifier specifying the document id. + :type doc_id: + :return: + A deferred that will fire with the new document (A SoledadDocument + instance) + :rtype: Deferred + """ + + def create_index(self, index_name, *index_expressions): + """ + Create an named index, which can then be queried for future lookups. + Creating an index which already exists is not an error, and is cheap. + Creating an index which does not match the index_expressions of the + existing index is an error. + Creating an index will block until the expressions have been evaluated + and the index generated. + + :param index_name: A unique name which can be used as a key prefix + :type index_name: str + :param index_expressions: + index expressions defining the index information. + :type index_expressions: dict + + Examples: + + "fieldname", or "fieldname.subfieldname" to index alphabetically + sorted on the contents of a field. + + "number(fieldname, width)", "lower(fieldname)" + """ + + def delete_index(self, index_name): + """ + Remove a named index. + + :param index_name: The name of the index we are removing + :type index_name: str + """ + + def list_indexes(self): + """ + List the definitions of all known indexes. + + :return: A list of [('index-name', ['field', 'field2'])] definitions. + :rtype: Deferred + """ + + def get_from_index(self, index_name, *key_values): + """ + Return documents that match the keys supplied. + + You must supply exactly the same number of values as have been defined + in the index. It is possible to do a prefix match by using '*' to + indicate a wildcard match. You can only supply '*' to trailing entries, + (eg 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) + It is also possible to append a '*' to the last supplied value (eg + 'val*', '*', '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: List of [Document] + :rtype: list + """ + + def get_count_from_index(self, index_name, *key_values): + """ + Return the count of the documents that match the keys and + values supplied. + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: count. + :rtype: int + """ + + def get_range_from_index(self, index_name, start_value, end_value): + """ + Return documents that fall within the specified range. + + Both ends of the range are inclusive. For both start_value and + end_value, one must supply exactly the same number of values as have + been defined in the index, or pass None. In case of a single column + index, a string is accepted as an alternative for a tuple with a single + value. It is possible to do a prefix match by using '*' to indicate + a wildcard match. You can only supply '*' to trailing entries, (eg + 'val', '*', '*' is allowed, but '*', 'val', 'val' is not.) It is also + possible to append a '*' to the last supplied value (eg 'val*', '*', + '*' or 'val', 'val*', '*', but not 'val*', 'val', '*') + + :param index_name: The index to query + :type index_name: str + :param start_values: tuples of values that define the lower bound of + the range. eg, if you have an index with 3 fields then you would + have: (val1, val2, val3) + :type start_values: tuple + :param end_values: tuples of values that define the upper bound of the + range. eg, if you have an index with 3 fields then you would have: + (val1, val2, val3) + :type end_values: tuple + :return: A deferred that will fire with a list of [Document] + :rtype: Deferred + """ + + def get_index_keys(self, index_name): + """ + Return all keys under which documents are indexed in this index. + + :param index_name: The index to query + :type index_name: str + :return: + A deferred that will fire with a list of tuples of indexed keys. + :rtype: Deferred + """ + + def get_doc_conflicts(self, doc_id): + """ + Get the list of conflicts for the given document. + + :param doc_id: the document id + :type doc_id: str + + :return: + A deferred that will fire with a list of the document entries that + are conflicted. + :rtype: Deferred + """ + + def resolve_doc(self, doc, conflicted_doc_revs): + """ + Mark a document as no longer conflicted. + + :param doc: a document with the new content to be inserted. + :type doc: SoledadDocument + :param conflicted_doc_revs: + A deferred that will fire with a list of revisions that the new + content supersedes. + :type conflicted_doc_revs: list + """ + + +class ISyncableStorage(Interface): + """ + I implement methods to synchronize with a remote replica. + """ + replica_uid = Attribute("The uid of the local replica") + server_url = Attribute("The URL of the Soledad server.") + syncing = Attribute( + "Property, True if the syncer is syncing.") + token = Attribute("The authentication Token.") + + def sync(self, defer_decryption=True): + """ + Synchronize the local encrypted replica with a remote replica. + + This method blocks until a syncing lock is acquired, so there are no + attempts of concurrent syncs from the same client replica. + + :param url: the url of the target replica to sync with + :type url: str + + :param defer_decryption: + Whether to defer the decryption process using the intermediate + database. If False, decryption will be done inline. + :type defer_decryption: bool + + :return: + A deferred that will fire with the local generation before the + synchronisation was performed. + :rtype: str + """ + + def stop_sync(self): + """ + Stop the current syncing process. + """ + + +class ISharedSecretsStorage(Interface): + """ + I implement methods needed for the Shared Recovery Database. + """ + secrets_path = Attribute( + "Path for storing encrypted key used for symmetric encryption.") + secrets_file_name = Attribute( + "The name of the file where the storage secrets will be stored") + + storage_secret = Attribute("") + remote_storage_secret = Attribute("") + shared_db = Attribute("The shared db object") + + # XXX this used internally from secrets, so it might be good to preserve + # as a public boundary with other components. + secrets = Attribute("") + + def init_shared_db(self, server_url, uuid, creds): + """ + Initialize the shared recovery database. + + :param server_url: + :type server_url: + :param uuid: + :type uuid: + :param creds: + :type creds: + """ + + def change_passphrase(self, new_passphrase): + """ + Change the passphrase that encrypts the storage secret. + + :param new_passphrase: The new passphrase. + :type new_passphrase: unicode + + :raise NoStorageSecret: Raised if there's no storage secret available. + """ + + # XXX not in use. Uncomment if we ever decide to allow + # multiple secrets. + # secret_id = Attribute("The id of the storage secret to be used") diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index 7a13a694..2e9c53a3 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -43,7 +43,7 @@ def set_crypto_pragmas(db_handle, sqlcipher_opts): def _set_key(db_handle, key, is_raw_key): """ - Set the C{key} for use with the database. + Set the ``key`` for use with the database. The process of creating a new, encrypted database is called 'keying' the database. SQLCipher uses just-in-time key derivation at the point @@ -60,9 +60,9 @@ def _set_key(db_handle, key, is_raw_key): :param key: The key for use with the database. :type key: str - :param is_raw_key: Whether C{key} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the - encyrption key. + :param is_raw_key: + Whether C{key} is a raw 64-char hex string or a passphrase that should + be hashed to obtain the encyrption key. :type is_raw_key: bool """ if is_raw_key: @@ -321,14 +321,11 @@ def set_write_ahead_logging(db_handle): """ logger.debug("SQLCIPHER: SETTING WRITE-AHEAD LOGGING") db_handle.cursor().execute('PRAGMA journal_mode=WAL') + # The optimum value can still use a little bit of tuning, but we favor # small sizes of the WAL file to get fast reads, since we assume that # the writes will be quick enough to not block too much. - # TODO - # As a further improvement, we might want to set autocheckpoint to 0 - # here and do the checkpoints manually in a separate thread, to avoid - # any blocks in the main thread (we should run a loopingcall from here) db_handle.cursor().execute('PRAGMA wal_autocheckpoint=50') diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 970ac82f..93f8c25d 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -144,8 +144,7 @@ class SoledadSecrets(object): Keys used to access storage secrets in recovery documents. """ - def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto, - secret_id=None): + def __init__(self, uuid, passphrase, secrets_path, shared_db, crypto): """ Initialize the secrets manager. @@ -161,17 +160,20 @@ class SoledadSecrets(object): :type shared_db: leap.soledad.client.shared_db.SoledadSharedDatabase :param crypto: A soledad crypto object. :type crypto: SoledadCrypto - :param secret_id: The id of the storage secret to be used. - :type secret_id: str """ + # XXX removed since not in use + # We will pick the first secret available. + # param secret_id: The id of the storage secret to be used. + self._uuid = uuid self._passphrase = passphrase self._secrets_path = secrets_path self._shared_db = shared_db self._crypto = crypto - self._secret_id = secret_id self._secrets = {} + self._secret_id = None + def bootstrap(self): """ Bootstrap secrets. @@ -247,7 +249,8 @@ class SoledadSecrets(object): try: self._load_secrets() # try to load from disk except IOError as e: - logger.warning('IOError while loading secrets from disk: %s' % str(e)) + logger.warning( + 'IOError while loading secrets from disk: %s' % str(e)) return False return self.storage_secret is not None diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index c9e69c73..a7e9e0fe 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -45,7 +45,6 @@ import logging import multiprocessing import os import threading -# import time --- needed for the win initialization hack import json from hashlib import sha256 @@ -56,7 +55,10 @@ from httplib import CannotSendRequest from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors -from taskthread import TimerTask + +from twisted.internet.task import LoopingCall +from twisted.internet.threads import deferToThreadPool +from twisted.python.threadpool import ThreadPool from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget @@ -64,7 +66,6 @@ from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer # TODO use adbapi too -from leap.soledad.client.mp_safe_db_TOREMOVE import MPSafeSQLiteDB from leap.soledad.client import pragmas from leap.soledad.common import soledad_assert from leap.soledad.common.document import SoledadDocument @@ -75,16 +76,6 @@ logger = logging.getLogger(__name__) # Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2 sqlite_backend.dbapi2 = sqlcipher_dbapi2 -# It seems that, as long as we are not using old sqlite versions, serialized -# mode is enabled by default at compile time. So accessing db connections from -# different threads should be safe, as long as no attempt is made to use them -# from multiple threads with no locking. -# See https://sqlite.org/threadsafe.html -# and http://bugs.python.org/issue16509 - -# TODO this no longer needed ------------- -#SQLITE_CHECK_SAME_THREAD = False - def initialize_sqlcipher_db(opts, on_init=None): """ @@ -96,12 +87,17 @@ def initialize_sqlcipher_db(opts, on_init=None): :type on_init: tuple :return: a SQLCipher connection """ - conn = sqlcipher_dbapi2.connect( - opts.path) + # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6) + # where without re-opening the database on Windows, it + # doesn't see the transaction that was just committed + # Removing from here now, look at the pysqlite implementation if the + # bug shows up in windows. - # XXX not needed -- check - #check_same_thread=SQLITE_CHECK_SAME_THREAD) + if not os.path.isfile(opts.path) and not opts.create: + raise u1db_errors.DatabaseDoesNotExist() + conn = sqlcipher_dbapi2.connect( + opts.path) set_init_pragmas(conn, opts, extra_queries=on_init) return conn @@ -196,10 +192,11 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ defer_encryption = False - # XXX not used afaik: - # _index_storage_value = 'expand referenced encrypted' + # The attribute _index_storage_value will be used as the lookup key. + # Here we extend it with `encrypted` + _index_storage_value = 'expand referenced encrypted' - def __init__(self, soledad_crypto, opts): + def __init__(self, opts): """ Connect to an existing SQLCipher database, creating a new sqlcipher database file if needed. @@ -217,18 +214,34 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :param opts: :type opts: SQLCipherOptions """ - # TODO ------ we don't need any soledad crypto in here - # ensure the db is encrypted if the file already exists if os.path.isfile(opts.path): - self.assert_db_is_encrypted(opts) + _assert_db_is_encrypted(opts) # connect to the sqlcipher database self._db_handle = initialize_sqlcipher_db(opts) - self._real_replica_uid = None - self._ensure_schema() + # TODO --------------------------------------------------- + # Everything else in this initialization has to be factored + # out, so it can be used from U1DBSqlcipherWrapper __init__ + # too. + # --------------------------------------------------------- + + self._ensure_schema() self.set_document_factory(soledad_doc_factory) + self._prime_replica_uid() + + def _prime_replica_uid(self): + """ + In the u1db implementation, _replica_uid is a property + that returns the value in _real_replica_uid, and does + a db query if no value found. + Here we prime the replica uid during initialization so + that we don't have to wait for the query afterwards. + """ + self._real_replica_uid = None + self._get_replica_uid() + print "REPLICA UID --->", self._real_replica_uid def _extra_schema_init(self, c): """ @@ -241,7 +254,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :param c: The cursor for querying the database. :type c: dbapi2.cursor """ - print "CALLING EXTRA SCHEMA INIT...." c.execute( 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') @@ -263,7 +275,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - # XXX move to API + # TODO XXX move to API XXX if self.defer_encryption: self.sync_queue.put_nowait(doc) return doc_rev @@ -272,37 +284,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): # SQLCipher API methods # - # TODO this doesn't need to be an instance method - def assert_db_is_encrypted(self, opts): - """ - Assert that the sqlcipher file contains an encrypted database. - - When opening an existing database, PRAGMA key will not immediately - throw an error if the key provided is incorrect. To test that the - database can be successfully opened with the provided key, it is - necessary to perform some operation on the database (i.e. read from - it) and confirm it is success. - - The easiest way to do this is select off the sqlite_master table, - which will attempt to read the first page of the database and will - parse the schema. - - :param opts: - """ - # We try to open an encrypted database with the regular u1db - # backend should raise a DatabaseError exception. - # If the regular backend succeeds, then we need to stop because - # the database was not properly initialized. - try: - sqlite_backend.SQLitePartialExpandDatabase(opts.path) - except sqlcipher_dbapi2.DatabaseError: - # assert that we can access it using SQLCipher with the given - # key - dummy_query = ('SELECT count(*) FROM sqlite_master',) - initialize_sqlcipher_db(opts, on_init=dummy_query) - else: - raise DatabaseIsNotEncrypted() - # Extra query methods: extensions to the base u1db sqlite implmentation. def get_count_from_index(self, index_name, *key_values): @@ -420,65 +401,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ self.close() - # TODO ---- rescue the fix for the windows case from here... - # @classmethod - # def _open_database(cls, sqlcipher_file, password, document_factory=None, - # crypto=None, raw_key=False, cipher='aes-256-cbc', - # kdf_iter=4000, cipher_page_size=1024, - # defer_encryption=False, sync_db_key=None): - # """ - # Open a SQLCipher database. -# - # :return: The database object. - # :rtype: SQLCipherDatabase - # """ - # cls.defer_encryption = defer_encryption - # if not os.path.isfile(sqlcipher_file): - # raise u1db_errors.DatabaseDoesNotExist() -# - # tries = 2 - # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6) - # where without re-opening the database on Windows, it - # doesn't see the transaction that was just committed - # while True: - # with cls.k_lock: - # db_handle = dbapi2.connect( - # sqlcipher_file, - # check_same_thread=SQLITE_CHECK_SAME_THREAD) -# - # try: - # set cryptographic params -# - # XXX pass only a CryptoOptions object around - #pragmas.set_crypto_pragmas( - #db_handle, password, raw_key, cipher, kdf_iter, - #cipher_page_size) - #c = db_handle.cursor() - # XXX if we use it here, it should be public - #v, err = cls._which_index_storage(c) - #except Exception as exc: - #logger.warning("ERROR OPENING DATABASE!") - #logger.debug("error was: %r" % exc) - #v, err = None, exc - #finally: - #db_handle.close() - #if v is not None: - #break - # possibly another process is initializing it, wait for it to be - # done - #if tries == 0: - #raise err # go for the richest error? - #tries -= 1 - #time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL) - #return SQLCipherDatabase._sqlite_registry[v]( - #sqlcipher_file, password, document_factory=document_factory, - #crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, - #cipher_page_size=cipher_page_size, sync_db_key=sync_db_key) - class SQLCipherU1DBSync(object): - _sync_watcher = None + _sync_loop = None _sync_enc_pool = None """ @@ -495,11 +421,10 @@ class SQLCipherU1DBSync(object): encrypting_lock = threading.Lock() """ - Period or recurrence of the periodic encrypting task, in seconds. + Period or recurrence of the Looping Call that will do the encryption to the + syncdb (in seconds). """ - # XXX use LoopingCall. - # Just use fucking deferreds, do not waste time looping. - ENCRYPT_TASK_PERIOD = 1 + ENCRYPT_LOOP_PERIOD = 1 """ A dictionary that hold locks which avoid multiple sync attempts from the @@ -507,39 +432,62 @@ class SQLCipherU1DBSync(object): """ syncing_lock = defaultdict(threading.Lock) - def _init_sync(self, opts, soledad_crypto, defer_encryption=False): + def __init__(self, opts, soledad_crypto, replica_uid, + defer_encryption=False): self._crypto = soledad_crypto - - # TODO ----- have to decide what to do with syncer self._sync_db_key = opts.sync_db_key self._sync_db = None self._sync_db_write_lock = None self._sync_enc_pool = None self.sync_queue = None - if self.defer_encryption: - # initialize sync db - self._init_sync_db() - # initialize syncing queue encryption pool - self._sync_enc_pool = crypto.SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) - self._sync_watcher = TimerTask(self._encrypt_syncing_docs, - self.ENCRYPT_TASK_PERIOD) - self._sync_watcher.start() - - # TODO move to class attribute? # we store syncers in a dictionary indexed by the target URL. We also # store a hash of the auth info in case auth info expires and we need # to rebuild the syncer for that target. The final self._syncers # format is the following:: # # self._syncers = {'': ('', syncer), ...} + self._syncers = {} self._sync_db_write_lock = threading.Lock() self.sync_queue = multiprocessing.Queue() - def _init_sync_db(self, opts): + self._sync_threadpool = None + self._initialize_sync_threadpool() + + if defer_encryption: + self._initialize_sync_db() + + # initialize syncing queue encryption pool + self._sync_enc_pool = crypto.SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + + # ------------------------------------------------------------------ + # From the documentation: If f returns a deferred, rescheduling + # will not take place until the deferred has fired. The result + # value is ignored. + + # TODO use this to avoid multiple sync attempts if the sync has not + # finished! + # ------------------------------------------------------------------ + + # XXX this was called sync_watcher --- trace any remnants + self._sync_loop = LoopingCall(self._encrypt_syncing_docs), + self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) + + def _initialize_sync_threadpool(self): + """ + Initialize a ThreadPool with exactly one thread, that will be used to + run all the network blocking calls for syncing on a separate thread. + + TODO this needs to be ported away from urllib and into twisted async + calls, and then we can ditch this syncing thread and reintegrate into + the main reactor. + """ + self._sync_threadpool = ThreadPool(0, 1) + + def _initialize_sync_db(self, opts): """ Initialize the Symmetrically-Encrypted document to be synced database, and the queue to communicate with subprocess workers. @@ -554,29 +502,32 @@ class SQLCipherU1DBSync(object): else: sync_db_path = ":memory:" - # XXX use initialize_sqlcipher_db here too - # TODO pass on_init queries to initialize_sqlcipher_db - self._sync_db = MPSafeSQLiteDB(sync_db_path) - pragmas.set_crypto_pragmas(self._sync_db, opts) + # --------------------------------------------------------- + # TODO use a separate adbapi for this (sqlcipher only, no u1db) + # We could control that it only has 1 or 2 threads. - # create sync tables - self._create_sync_db_tables() + opts.path = sync_db_path - def _create_sync_db_tables(self): + self._sync_db = initialize_sqlcipher_db( + opts, on_init=self._sync_db_extra_init) + # --------------------------------------------------------- + + @property + def _sync_db_extra_init(self): """ - Create tables for the local sync documents db if needed. + Queries for creating tables for the local sync documents db if needed. + They are passed as extra initialization to initialize_sqlciphjer_db + + :rtype: tuple of strings """ - # TODO use adbapi --------------------------------- + maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" encr = crypto.SyncEncrypterPool decr = crypto.SyncDecrypterPool - sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + sql_decr_table_query = (maybe_create % ( decr.TABLE_NAME, decr.FIELD_NAMES)) - - with self._sync_db_write_lock: - self._sync_db.execute(sql_encr) - self._sync_db.execute(sql_decr) + return (sql_encr_table_query, sql_decr_table_query) def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ @@ -599,15 +550,24 @@ class SQLCipherU1DBSync(object): database. If False, decryption will be done inline. :type defer_decryption: bool - :return: The local generation before the synchronisation was performed. - :rtype: int + :return: + A Deferred, that will fire with the local generation (type `int`) + before the synchronisation was performed. + :rtype: deferred """ + kwargs = {'creds': creds, 'autocreate': autocreate, + 'defer_decryption': defer_decryption} + return deferToThreadPool(self._sync, url, **kwargs) + + def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): res = None + # the following context manager blocks until the syncing lock can be # acquired. - if defer_decryption: - self._init_sync_db() - with self.syncer(url, creds=creds) as syncer: + # TODO review, I think this is no longer needed with a 1-thread + # threadpool. + + with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... try: res = syncer.sync(autocreate=autocreate, @@ -634,7 +594,7 @@ class SQLCipherU1DBSync(object): syncer.stop() @contextmanager - def syncer(self, url, creds=None): + def _syncer(self, url, creds=None): """ Accesor for synchronizer. @@ -643,13 +603,13 @@ class SQLCipherU1DBSync(object): Because of that, this method blocks until the syncing lock can be acquired. """ - with self.syncing_lock[self._get_replica_uid()]: + with self.syncing_lock[self.replica_uid]: syncer = self._get_syncer(url, creds=creds) yield syncer @property def syncing(self): - lock = self.syncing_lock[self._get_replica_uid()] + lock = self.syncing_lock[self.replica_uid] acquired_lock = lock.acquire(False) if acquired_lock is False: return True @@ -679,7 +639,7 @@ class SQLCipherU1DBSync(object): syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, - self._replica_uid, + self.replica_uid, creds=creds, crypto=self._crypto, sync_db=self._sync_db, @@ -701,8 +661,11 @@ class SQLCipherU1DBSync(object): to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. - Called periodical from the TimerTask self._sync_watcher. + Called periodically from the LoopingCall self._sync_loop. """ + # TODO should return a deferred that would firewhen the encryption is + # done. See note on __init__ + lock = self.encrypting_lock # optional wait flag used to avoid blocking if not lock.acquire(False): @@ -720,19 +683,19 @@ class SQLCipherU1DBSync(object): finally: lock.release() - @property - def replica_uid(self): - return self._get_replica_uid() + def get_generation(self): + # FIXME + # XXX this SHOULD BE a callback + return self._get_generation() def close(self): """ Close the syncer and syncdb orderly """ - # stop the sync watcher for deferred encryption - if self._sync_watcher is not None: - self._sync_watcher.stop() - self._sync_watcher.shutdown() - self._sync_watcher = None + # stop the sync loop for deferred encryption + if self._sync_loop is not None: + self._sync_loop.stop() + self._sync_loop = None # close all open syncers for url in self._syncers: _, syncer = self._syncers[url] @@ -753,6 +716,37 @@ class SQLCipherU1DBSync(object): del self.sync_queue self.sync_queue = None + +def _assert_db_is_encrypted(opts): + """ + Assert that the sqlcipher file contains an encrypted database. + + When opening an existing database, PRAGMA key will not immediately + throw an error if the key provided is incorrect. To test that the + database can be successfully opened with the provided key, it is + necessary to perform some operation on the database (i.e. read from + it) and confirm it is success. + + The easiest way to do this is select off the sqlite_master table, + which will attempt to read the first page of the database and will + parse the schema. + + :param opts: + """ + # We try to open an encrypted database with the regular u1db + # backend should raise a DatabaseError exception. + # If the regular backend succeeds, then we need to stop because + # the database was not properly initialized. + try: + sqlite_backend.SQLitePartialExpandDatabase(opts.path) + except sqlcipher_dbapi2.DatabaseError: + # assert that we can access it using SQLCipher with the given + # key + dummy_query = ('SELECT count(*) FROM sqlite_master',) + initialize_sqlcipher_db(opts, on_init=dummy_query) + else: + raise DatabaseIsNotEncrypted() + # # Exceptions # diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index a47afbb6..aa19ddab 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -17,10 +17,9 @@ """ Soledad synchronization utilities. - Extend u1db Synchronizer with the ability to: - * Defer the update of the known replica uid until all the decryption of + * Postpone the update of the known replica uid until all the decryption of the incoming messages has been processed. * Be interrupted and recovered. @@ -48,6 +47,8 @@ class SoledadSynchronizer(Synchronizer): Also modified to allow for interrupting the synchronization process. """ + # TODO can delegate the syncing to the api object, living in the reactor + # thread, and use a simple flag. syncing_lock = Lock() def stop(self): @@ -232,6 +233,8 @@ class SoledadSynchronizer(Synchronizer): # release if something in the syncdb-decrypt goes wrong. we could keep # track of the release date and cleanup unrealistic sync entries after # some time. + + # TODO use cancellable deferreds instead locked = self.syncing_lock.locked() return locked diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 651d3ee5..9b546402 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -14,14 +14,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ A U1DB backend for encrypting data before sending to server and decrypting after receiving. """ - - import cStringIO import gzip import logging @@ -34,7 +30,7 @@ from time import sleep from uuid import uuid4 import simplejson as json -from taskthread import TimerTask + from u1db import errors from u1db.remote import utils, http_errors from u1db.remote.http_target import HTTPSyncTarget @@ -42,6 +38,8 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import sameProxiedObjects, setProxiedObject +from twisted.internet.task import LoopingCall + from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted @@ -755,7 +753,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Period of recurrence of the periodic decrypting task, in seconds. """ - DECRYPT_TASK_PERIOD = 0.5 + DECRYPT_LOOP_PERIOD = 0.5 # # Modified HTTPSyncTarget methods. @@ -802,7 +800,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_db_write_lock = None self._decryption_callback = None self._sync_decr_pool = None - self._sync_watcher = None + self._sync_loop = None if sync_db and sync_db_write_lock is not None: self._sync_db = sync_db self._sync_db_write_lock = sync_db_write_lock @@ -828,23 +826,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._sync_decr_pool.close() self._sync_decr_pool = None - def _setup_sync_watcher(self): + def _setup_sync_loop(self): """ - Set up the sync watcher for deferred decryption. + Set up the sync loop for deferred decryption. """ - if self._sync_watcher is None: - self._sync_watcher = TimerTask( - self._decrypt_syncing_received_docs, - delay=self.DECRYPT_TASK_PERIOD) + if self._sync_loop is None: + self._sync_loop = LoopingCall( + self._decrypt_syncing_received_docs) + self._sync_loop.start(self.DECRYPT_LOOP_PERIOD) - def _teardown_sync_watcher(self): + def _teardown_sync_loop(self): """ - Tear down the sync watcher. + Tear down the sync loop. """ - if self._sync_watcher is not None: - self._sync_watcher.stop() - self._sync_watcher.shutdown() - self._sync_watcher = None + if self._sync_loop is not None: + self._sync_loop.stop() + self._sync_loop = None def _get_replica_uid(self, url): """ @@ -1131,7 +1128,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if defer_decryption and self._sync_db is not None: self._sync_exchange_lock.acquire() self._setup_sync_decr_pool() - self._setup_sync_watcher() + self._setup_sync_loop() self._defer_decryption = True else: # fall back @@ -1292,10 +1289,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # decrypt docs in case of deferred decryption if defer_decryption: - self._sync_watcher.start() + self._sync_loop.start() while self.clear_to_sync() is False: - sleep(self.DECRYPT_TASK_PERIOD) - self._teardown_sync_watcher() + sleep(self.DECRYPT_LOOP_PERIOD) + self._teardown_sync_loop() self._teardown_sync_decr_pool() self._sync_exchange_lock.release() @@ -1460,7 +1457,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): Decrypt the documents received from remote replica and insert them into the local one. - Called periodically from TimerTask self._sync_watcher. + Called periodically from LoopingCall self._sync_loop. """ if sameProxiedObjects( self._insert_doc_cb.get(self.source_replica_uid), -- cgit v1.2.3