diff options
-rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 44 | ||||
-rw-r--r-- | client/src/leap/soledad/client/api.py | 72 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 121 |
3 files changed, 164 insertions, 73 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 60d9e195..733fce23 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -24,11 +24,9 @@ import sys from functools import partial -import u1db -from u1db.backends import sqlite_backend - from twisted.enterprise import adbapi from twisted.python import log +from zope.proxy import ProxyBase, setProxiedObject from leap.soledad.client import sqlcipher as soledad_sqlcipher @@ -46,39 +44,9 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): check_same_thread=False, cp_openfun=openfun) -class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): - """ - A very simple wrapper for u1db around sqlcipher backend. - - Instead of initializing the database on the fly, it just uses an existing - connection that is passed to it in the initializer. - """ - - def __init__(self, conn): - self._db_handle = conn - self._real_replica_uid = None - self._ensure_schema() - self._factory = u1db.Document - - -class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase): - """ - A wrapper for u1db that uses the Soledad-extended sqlcipher backend. - - Instead of initializing the database on the fly, it just uses an existing - connection that is passed to it in the initializer. - """ - def __init__(self, conn): - self._db_handle = conn - self._real_replica_uid = None - self._ensure_schema() - self.set_document_factory(soledad_sqlcipher.soledad_doc_factory) - self._prime_replica_uid() - - class U1DBConnection(adbapi.Connection): - u1db_wrapper = SoledadSQLCipherWrapper + u1db_wrapper = soledad_sqlcipher.SoledadSQLCipherWrapper def __init__(self, pool, init_u1db=False): self.init_u1db = init_u1db @@ -120,6 +88,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool): # all u1db connections, hashed by thread-id self.u1dbconnections = {} + # The replica uid, primed by the connections on init. + self.replica_uid = ProxyBase(None) + def runU1DBQuery(self, meth, *args, **kw): meth = "u1db_%s" % meth return self.runInteraction(self._runU1DBQuery, meth, *args, **kw) @@ -133,6 +104,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool): u1db = self.u1dbconnections.get(tid) conn = self.connectionFactory(self, init_u1db=not bool(u1db)) + if self.replica_uid is None: + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + print "GOT REPLICA UID IN DBPOOL", self.replica_uid + if u1db is None: self.u1dbconnections[tid] = conn._u1db else: diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 493f6c1d..ff6257b2 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -158,10 +158,10 @@ class Soledad(object): # store config params self._uuid = uuid self._passphrase = passphrase - self._secrets_path = secrets_path self._local_db_path = local_db_path self._server_url = server_url self._defer_encryption = defer_encryption + self._secrets_path = None self.shared_db = None @@ -176,6 +176,8 @@ class Soledad(object): self._init_config_with_defaults() self._init_working_dirs() + self._secrets_path = secrets_path + # Initialize shared recovery database self.init_shared_db(server_url, uuid, self._creds) @@ -193,13 +195,12 @@ class Soledad(object): Initialize configuration using default values for missing params. """ soledad_assert_type(self._passphrase, unicode) - initialize = lambda attr, val: attr is None and setattr(attr, val) + initialize = lambda attr, val: getattr( + self, attr, None) is None and setattr(self, attr, val) - # initialize secrets_path - initialize(self._secrets_path, os.path.join( + initialize("_secrets_path", os.path.join( self.default_prefix, self.secrets_file_name)) - # initialize local_db_path - initialize(self._local_db_path, os.path.join( + initialize("_local_db_path", os.path.join( self.default_prefix, self.local_db_file_name)) # initialize server_url soledad_assert(self._server_url is not None, @@ -218,8 +219,8 @@ class Soledad(object): def _init_secrets(self): self._secrets = SoledadSecrets( - self.uuid, self.passphrase, self.secrets_path, - self._shared_db, self._crypto) + self.uuid, self._passphrase, self._secrets_path, + self.shared_db, self._crypto) self._secrets.bootstrap() def _init_u1db_sqlcipher_backend(self): @@ -249,8 +250,11 @@ class Soledad(object): self._dbpool = adbapi.getConnectionPool(opts) def _init_u1db_syncer(self): + replica_uid = self._dbpool.replica_uid + print "replica UID (syncer init)", replica_uid self._dbsyncer = SQLCipherU1DBSync( - self._soledad_opts, self._crypto, self._defer_encryption) + self._soledad_opts, self._crypto, replica_uid, + self._defer_encryption) # # Closing methods @@ -269,6 +273,9 @@ class Soledad(object): # ILocalStorage # + def _defer(self, meth, *args, **kw): + return self._dbpool.runU1DBQuery(meth, *args, **kw) + def put_doc(self, doc): """ ============================== WARNING ============================== @@ -282,58 +289,59 @@ class Soledad(object): # Isn't it better to defend ourselves from the mutability, to avoid # nasty surprises? doc.content = self._convert_to_unicode(doc.content) - return self._dbpool.put_doc(doc) + return self._defer("put_doc", doc) def delete_doc(self, doc): # XXX what does this do when fired??? - return self._dbpool.delete_doc(doc) + return self._defer("delete_doc", doc) def get_doc(self, doc_id, include_deleted=False): - return self._dbpool.get_doc(doc_id, include_deleted=include_deleted) + return self._defer( + "get_doc", doc_id, include_deleted=include_deleted) - def get_docs(self, doc_ids, check_for_conflicts=True, - include_deleted=False): - return self._dbpool.get_docs(doc_ids, - check_for_conflicts=check_for_conflicts, - include_deleted=include_deleted) + def get_docs( + self, doc_ids, check_for_conflicts=True, include_deleted=False): + return self._defer( + "get_docs", doc_ids, check_for_conflicts=check_for_conflicts, + include_deleted=include_deleted) def get_all_docs(self, include_deleted=False): - return self._dbpool.get_all_docs(include_deleted) + return self._defer("get_all_docs", include_deleted) def create_doc(self, content, doc_id=None): - return self._dbpool.create_doc( - _convert_to_unicode(content), doc_id=doc_id) + return self._defer( + "create_doc", _convert_to_unicode(content), doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): - return self._dbpool.create_doc_from_json(json, doc_id=doc_id) + return self._defer("create_doc_from_json", json, doc_id=doc_id) def create_index(self, index_name, *index_expressions): - return self._dbpool.create_index(index_name, *index_expressions) + return self._defer("create_index", index_name, *index_expressions) def delete_index(self, index_name): - return self._dbpool.delete_index(index_name) + return self._defer("delete_index", index_name) def list_indexes(self): - return self._dbpool.list_indexes() + return self._defer("list_indexes") def get_from_index(self, index_name, *key_values): - return self._dbpool.get_from_index(index_name, *key_values) + return self._defer("get_from_index", index_name, *key_values) def get_count_from_index(self, index_name, *key_values): - return self._dbpool.get_count_from_index(index_name, *key_values) + return self._defer("get_count_from_index", index_name, *key_values) def get_range_from_index(self, index_name, start_value, end_value): - return self._dbpool.get_range_from_index( - index_name, start_value, end_value) + return self._defer( + "get_range_from_index", index_name, start_value, end_value) def get_index_keys(self, index_name): - return self._dbpool.get_index_keys(index_name) + return self._defer("get_index_keys", index_name) def get_doc_conflicts(self, doc_id): - return self._dbpool.get_doc_conflicts(doc_id) + return self._defer("get_doc_conflicts", doc_id) def resolve_doc(self, doc, conflicted_doc_revs): - return self._dbpool.resolve_doc(doc, conflicted_doc_revs) + return self._defer("resolve_doc", doc, conflicted_doc_revs) def _get_local_db_path(self): return self._local_db_path @@ -460,6 +468,8 @@ class Soledad(object): # def init_shared_db(self, server_url, uuid, creds): + # XXX should assert that server_url begins with https + # Otherwise u1db target will fail. shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME) self.shared_db = SoledadSharedDatabase.open_database( shared_db_url, diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index c645bb8d..c8e14176 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,10 +55,14 @@ from httplib import CannotSendRequest from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +import u1db + +from twisted.internet import reactor from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool +from twisted.python import log from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget @@ -77,7 +81,7 @@ logger = logging.getLogger(__name__) sqlite_backend.dbapi2 = sqlcipher_dbapi2 -def initialize_sqlcipher_db(opts, on_init=None): +def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): """ Initialize a SQLCipher database. @@ -97,7 +101,7 @@ def initialize_sqlcipher_db(opts, on_init=None): raise u1db_errors.DatabaseDoesNotExist() conn = sqlcipher_dbapi2.connect( - opts.path) + opts.path, check_same_thread=check_same_thread) set_init_pragmas(conn, opts, extra_queries=on_init) return conn @@ -241,7 +245,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ self._real_replica_uid = None self._get_replica_uid() - print "REPLICA UID --->", self._real_replica_uid def _extra_schema_init(self, c): """ @@ -402,7 +405,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.close() -class SQLCipherU1DBSync(object): +class SQLCipherU1DBSync(SQLCipherDatabase): _sync_loop = None _sync_enc_pool = None @@ -435,7 +438,13 @@ class SQLCipherU1DBSync(object): def __init__(self, opts, soledad_crypto, replica_uid, defer_encryption=False): + self._opts = opts + self._path = opts.path self._crypto = soledad_crypto + self.__replica_uid = replica_uid + + print "REPLICA UID (u1dbsync init)", replica_uid + self._sync_db_key = opts.sync_db_key self._sync_db = None self._sync_db_write_lock = None @@ -453,9 +462,17 @@ class SQLCipherU1DBSync(object): self._sync_db_write_lock = threading.Lock() self.sync_queue = multiprocessing.Queue() + self.running = False self._sync_threadpool = None self._initialize_sync_threadpool() + self._reactor = reactor + self._reactor.callWhenRunning(self._start) + + self.ready = False + self._db_handle = None + self._initialize_syncer_main_db() + if defer_encryption: self._initialize_sync_db() @@ -476,6 +493,40 @@ class SQLCipherU1DBSync(object): self._sync_loop = LoopingCall(self._encrypt_syncing_docs), self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) + self.shutdownID = None + + @property + def _replica_uid(self): + return str(self.__replica_uid) + + def _start(self): + if not self.running: + self._sync_threadpool.start() + self.shutdownID = self._reactor.addSystemEventTrigger( + 'during', 'shutdown', self.finalClose) + self.running = True + + def _defer_to_sync_threadpool(self, meth, *args, **kwargs): + return deferToThreadPool( + self._reactor, self._sync_threadpool, meth, *args, **kwargs) + + def _initialize_syncer_main_db(self): + + def init_db(): + + # XXX DEBUG --------------------------------------------- + import thread + print "initializing in thread", thread.get_ident() + # XXX DEBUG --------------------------------------------- + + self._db_handle = initialize_sqlcipher_db( + self._opts, check_same_thread=False) + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + + return self._defer_to_sync_threadpool(init_db) + def _initialize_sync_threadpool(self): """ Initialize a ThreadPool with exactly one thread, that will be used to @@ -556,9 +607,19 @@ class SQLCipherU1DBSync(object): before the synchronisation was performed. :rtype: deferred """ + if not self.ready: + print "not ready yet..." + # XXX --------------------------------------------------------- + # This might happen because the database has not yet been + # initialized (it's deferred to the theadpool). + # A good strategy might involve to return a deferred that will + # callLater this same function after a timeout (deferLater) + # Might want to keep track of retries and cancel too. + # -------------------------------------------------------------- + print "Syncing to...", url kwargs = {'creds': creds, 'autocreate': autocreate, 'defer_decryption': defer_decryption} - return deferToThreadPool(self._sync, url, **kwargs) + return self._defer_to_sync_threadpool(self._sync, url, **kwargs) def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): res = None @@ -568,9 +629,11 @@ class SQLCipherU1DBSync(object): # TODO review, I think this is no longer needed with a 1-thread # threadpool. + log.msg("in _sync") with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... try: + log.msg('syncer sync...') res = syncer.sync(autocreate=autocreate, defer_decryption=defer_decryption) @@ -590,6 +653,9 @@ class SQLCipherU1DBSync(object): """ Interrupt all ongoing syncs. """ + self._defer_to_sync_threadpool(self._stop_sync) + + def _stop_sync(self): for url in self._syncers: _, syncer = self._syncers[url] syncer.stop() @@ -604,13 +670,13 @@ class SQLCipherU1DBSync(object): Because of that, this method blocks until the syncing lock can be acquired. """ - with self.syncing_lock[self.replica_uid]: + with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) yield syncer @property def syncing(self): - lock = self.syncing_lock[self.replica_uid] + lock = self.syncing_lock[self._path] acquired_lock = lock.acquire(False) if acquired_lock is False: return True @@ -640,7 +706,8 @@ class SQLCipherU1DBSync(object): syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, - self.replica_uid, + # XXX is the replica_uid ready? + self._replica_uid, creds=creds, crypto=self._crypto, sync_db=self._sync_db, @@ -689,6 +756,14 @@ class SQLCipherU1DBSync(object): # XXX this SHOULD BE a callback return self._get_generation() + def finalClose(self): + """ + This should only be called by the shutdown trigger. + """ + self.shutdownID = None + self._sync_threadpool.stop() + self.running = False + def close(self): """ Close the syncer and syncdb orderly @@ -718,6 +793,36 @@ class SQLCipherU1DBSync(object): self.sync_queue = None +class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): + """ + A very simple wrapper for u1db around sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ + + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self._factory = u1db.Document + + +class SoledadSQLCipherWrapper(SQLCipherDatabase): + """ + A wrapper for u1db that uses the Soledad-extended sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + self._prime_replica_uid() + + def _assert_db_is_encrypted(opts): """ Assert that the sqlcipher file contains an encrypted database. |