diff options
author | Kali Kaneko <kali@leap.se> | 2014-10-09 01:55:58 +0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:03:17 -0400 |
commit | 71d0ba384b16e5a1d9cfd4ee2b046ff6957f9b4e (patch) | |
tree | ffe50302751853a0865c0a00b7929bb3d763e836 | |
parent | 1ae8f27c622034dc9524dab4b971bf0828966dd1 (diff) |
working sync-threadpool
* Completed mapping of async dbpool
* Fixed shared db initialization.
Stuff To Be Fixed yet:
[ ] All inserts have to be done from the sync threadpool.
Right now we're reusing the connection from multiple
threads in the syncer. I'm assuming the writes are automatically
locking the file at the sqlite level, so this shouldn't pose a
problem.
[ ] Correctly handle the multiprocessing pool, and the callback
execution.
-rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 44 | ||||
-rw-r--r-- | client/src/leap/soledad/client/api.py | 72 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 121 |
3 files changed, 164 insertions, 73 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 60d9e195..733fce23 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -24,11 +24,9 @@ import sys from functools import partial -import u1db -from u1db.backends import sqlite_backend - from twisted.enterprise import adbapi from twisted.python import log +from zope.proxy import ProxyBase, setProxiedObject from leap.soledad.client import sqlcipher as soledad_sqlcipher @@ -46,39 +44,9 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): check_same_thread=False, cp_openfun=openfun) -class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): - """ - A very simple wrapper for u1db around sqlcipher backend. - - Instead of initializing the database on the fly, it just uses an existing - connection that is passed to it in the initializer. - """ - - def __init__(self, conn): - self._db_handle = conn - self._real_replica_uid = None - self._ensure_schema() - self._factory = u1db.Document - - -class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase): - """ - A wrapper for u1db that uses the Soledad-extended sqlcipher backend. - - Instead of initializing the database on the fly, it just uses an existing - connection that is passed to it in the initializer. - """ - def __init__(self, conn): - self._db_handle = conn - self._real_replica_uid = None - self._ensure_schema() - self.set_document_factory(soledad_sqlcipher.soledad_doc_factory) - self._prime_replica_uid() - - class U1DBConnection(adbapi.Connection): - u1db_wrapper = SoledadSQLCipherWrapper + u1db_wrapper = soledad_sqlcipher.SoledadSQLCipherWrapper def __init__(self, pool, init_u1db=False): self.init_u1db = init_u1db @@ -120,6 +88,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool): # all u1db connections, hashed by thread-id self.u1dbconnections = {} + # The replica uid, primed by the connections on init. + self.replica_uid = ProxyBase(None) + def runU1DBQuery(self, meth, *args, **kw): meth = "u1db_%s" % meth return self.runInteraction(self._runU1DBQuery, meth, *args, **kw) @@ -133,6 +104,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool): u1db = self.u1dbconnections.get(tid) conn = self.connectionFactory(self, init_u1db=not bool(u1db)) + if self.replica_uid is None: + replica_uid = conn._u1db._real_replica_uid + setProxiedObject(self.replica_uid, replica_uid) + print "GOT REPLICA UID IN DBPOOL", self.replica_uid + if u1db is None: self.u1dbconnections[tid] = conn._u1db else: diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 493f6c1d..ff6257b2 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -158,10 +158,10 @@ class Soledad(object): # store config params self._uuid = uuid self._passphrase = passphrase - self._secrets_path = secrets_path self._local_db_path = local_db_path self._server_url = server_url self._defer_encryption = defer_encryption + self._secrets_path = None self.shared_db = None @@ -176,6 +176,8 @@ class Soledad(object): self._init_config_with_defaults() self._init_working_dirs() + self._secrets_path = secrets_path + # Initialize shared recovery database self.init_shared_db(server_url, uuid, self._creds) @@ -193,13 +195,12 @@ class Soledad(object): Initialize configuration using default values for missing params. """ soledad_assert_type(self._passphrase, unicode) - initialize = lambda attr, val: attr is None and setattr(attr, val) + initialize = lambda attr, val: getattr( + self, attr, None) is None and setattr(self, attr, val) - # initialize secrets_path - initialize(self._secrets_path, os.path.join( + initialize("_secrets_path", os.path.join( self.default_prefix, self.secrets_file_name)) - # initialize local_db_path - initialize(self._local_db_path, os.path.join( + initialize("_local_db_path", os.path.join( self.default_prefix, self.local_db_file_name)) # initialize server_url soledad_assert(self._server_url is not None, @@ -218,8 +219,8 @@ class Soledad(object): def _init_secrets(self): self._secrets = SoledadSecrets( - self.uuid, self.passphrase, self.secrets_path, - self._shared_db, self._crypto) + self.uuid, self._passphrase, self._secrets_path, + self.shared_db, self._crypto) self._secrets.bootstrap() def _init_u1db_sqlcipher_backend(self): @@ -249,8 +250,11 @@ class Soledad(object): self._dbpool = adbapi.getConnectionPool(opts) def _init_u1db_syncer(self): + replica_uid = self._dbpool.replica_uid + print "replica UID (syncer init)", replica_uid self._dbsyncer = SQLCipherU1DBSync( - self._soledad_opts, self._crypto, self._defer_encryption) + self._soledad_opts, self._crypto, replica_uid, + self._defer_encryption) # # Closing methods @@ -269,6 +273,9 @@ class Soledad(object): # ILocalStorage # + def _defer(self, meth, *args, **kw): + return self._dbpool.runU1DBQuery(meth, *args, **kw) + def put_doc(self, doc): """ ============================== WARNING ============================== @@ -282,58 +289,59 @@ class Soledad(object): # Isn't it better to defend ourselves from the mutability, to avoid # nasty surprises? doc.content = self._convert_to_unicode(doc.content) - return self._dbpool.put_doc(doc) + return self._defer("put_doc", doc) def delete_doc(self, doc): # XXX what does this do when fired??? - return self._dbpool.delete_doc(doc) + return self._defer("delete_doc", doc) def get_doc(self, doc_id, include_deleted=False): - return self._dbpool.get_doc(doc_id, include_deleted=include_deleted) + return self._defer( + "get_doc", doc_id, include_deleted=include_deleted) - def get_docs(self, doc_ids, check_for_conflicts=True, - include_deleted=False): - return self._dbpool.get_docs(doc_ids, - check_for_conflicts=check_for_conflicts, - include_deleted=include_deleted) + def get_docs( + self, doc_ids, check_for_conflicts=True, include_deleted=False): + return self._defer( + "get_docs", doc_ids, check_for_conflicts=check_for_conflicts, + include_deleted=include_deleted) def get_all_docs(self, include_deleted=False): - return self._dbpool.get_all_docs(include_deleted) + return self._defer("get_all_docs", include_deleted) def create_doc(self, content, doc_id=None): - return self._dbpool.create_doc( - _convert_to_unicode(content), doc_id=doc_id) + return self._defer( + "create_doc", _convert_to_unicode(content), doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): - return self._dbpool.create_doc_from_json(json, doc_id=doc_id) + return self._defer("create_doc_from_json", json, doc_id=doc_id) def create_index(self, index_name, *index_expressions): - return self._dbpool.create_index(index_name, *index_expressions) + return self._defer("create_index", index_name, *index_expressions) def delete_index(self, index_name): - return self._dbpool.delete_index(index_name) + return self._defer("delete_index", index_name) def list_indexes(self): - return self._dbpool.list_indexes() + return self._defer("list_indexes") def get_from_index(self, index_name, *key_values): - return self._dbpool.get_from_index(index_name, *key_values) + return self._defer("get_from_index", index_name, *key_values) def get_count_from_index(self, index_name, *key_values): - return self._dbpool.get_count_from_index(index_name, *key_values) + return self._defer("get_count_from_index", index_name, *key_values) def get_range_from_index(self, index_name, start_value, end_value): - return self._dbpool.get_range_from_index( - index_name, start_value, end_value) + return self._defer( + "get_range_from_index", index_name, start_value, end_value) def get_index_keys(self, index_name): - return self._dbpool.get_index_keys(index_name) + return self._defer("get_index_keys", index_name) def get_doc_conflicts(self, doc_id): - return self._dbpool.get_doc_conflicts(doc_id) + return self._defer("get_doc_conflicts", doc_id) def resolve_doc(self, doc, conflicted_doc_revs): - return self._dbpool.resolve_doc(doc, conflicted_doc_revs) + return self._defer("resolve_doc", doc, conflicted_doc_revs) def _get_local_db_path(self): return self._local_db_path @@ -460,6 +468,8 @@ class Soledad(object): # def init_shared_db(self, server_url, uuid, creds): + # XXX should assert that server_url begins with https + # Otherwise u1db target will fail. shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME) self.shared_db = SoledadSharedDatabase.open_database( shared_db_url, diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index c645bb8d..c8e14176 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -55,10 +55,14 @@ from httplib import CannotSendRequest from pysqlcipher import dbapi2 as sqlcipher_dbapi2 from u1db.backends import sqlite_backend from u1db import errors as u1db_errors +import u1db + +from twisted.internet import reactor from twisted.internet.task import LoopingCall from twisted.internet.threads import deferToThreadPool from twisted.python.threadpool import ThreadPool +from twisted.python import log from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget @@ -77,7 +81,7 @@ logger = logging.getLogger(__name__) sqlite_backend.dbapi2 = sqlcipher_dbapi2 -def initialize_sqlcipher_db(opts, on_init=None): +def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True): """ Initialize a SQLCipher database. @@ -97,7 +101,7 @@ def initialize_sqlcipher_db(opts, on_init=None): raise u1db_errors.DatabaseDoesNotExist() conn = sqlcipher_dbapi2.connect( - opts.path) + opts.path, check_same_thread=check_same_thread) set_init_pragmas(conn, opts, extra_queries=on_init) return conn @@ -241,7 +245,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ self._real_replica_uid = None self._get_replica_uid() - print "REPLICA UID --->", self._real_replica_uid def _extra_schema_init(self, c): """ @@ -402,7 +405,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self.close() -class SQLCipherU1DBSync(object): +class SQLCipherU1DBSync(SQLCipherDatabase): _sync_loop = None _sync_enc_pool = None @@ -435,7 +438,13 @@ class SQLCipherU1DBSync(object): def __init__(self, opts, soledad_crypto, replica_uid, defer_encryption=False): + self._opts = opts + self._path = opts.path self._crypto = soledad_crypto + self.__replica_uid = replica_uid + + print "REPLICA UID (u1dbsync init)", replica_uid + self._sync_db_key = opts.sync_db_key self._sync_db = None self._sync_db_write_lock = None @@ -453,9 +462,17 @@ class SQLCipherU1DBSync(object): self._sync_db_write_lock = threading.Lock() self.sync_queue = multiprocessing.Queue() + self.running = False self._sync_threadpool = None self._initialize_sync_threadpool() + self._reactor = reactor + self._reactor.callWhenRunning(self._start) + + self.ready = False + self._db_handle = None + self._initialize_syncer_main_db() + if defer_encryption: self._initialize_sync_db() @@ -476,6 +493,40 @@ class SQLCipherU1DBSync(object): self._sync_loop = LoopingCall(self._encrypt_syncing_docs), self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD) + self.shutdownID = None + + @property + def _replica_uid(self): + return str(self.__replica_uid) + + def _start(self): + if not self.running: + self._sync_threadpool.start() + self.shutdownID = self._reactor.addSystemEventTrigger( + 'during', 'shutdown', self.finalClose) + self.running = True + + def _defer_to_sync_threadpool(self, meth, *args, **kwargs): + return deferToThreadPool( + self._reactor, self._sync_threadpool, meth, *args, **kwargs) + + def _initialize_syncer_main_db(self): + + def init_db(): + + # XXX DEBUG --------------------------------------------- + import thread + print "initializing in thread", thread.get_ident() + # XXX DEBUG --------------------------------------------- + + self._db_handle = initialize_sqlcipher_db( + self._opts, check_same_thread=False) + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + + return self._defer_to_sync_threadpool(init_db) + def _initialize_sync_threadpool(self): """ Initialize a ThreadPool with exactly one thread, that will be used to @@ -556,9 +607,19 @@ class SQLCipherU1DBSync(object): before the synchronisation was performed. :rtype: deferred """ + if not self.ready: + print "not ready yet..." + # XXX --------------------------------------------------------- + # This might happen because the database has not yet been + # initialized (it's deferred to the theadpool). + # A good strategy might involve to return a deferred that will + # callLater this same function after a timeout (deferLater) + # Might want to keep track of retries and cancel too. + # -------------------------------------------------------------- + print "Syncing to...", url kwargs = {'creds': creds, 'autocreate': autocreate, 'defer_decryption': defer_decryption} - return deferToThreadPool(self._sync, url, **kwargs) + return self._defer_to_sync_threadpool(self._sync, url, **kwargs) def _sync(self, url, creds=None, autocreate=True, defer_decryption=True): res = None @@ -568,9 +629,11 @@ class SQLCipherU1DBSync(object): # TODO review, I think this is no longer needed with a 1-thread # threadpool. + log.msg("in _sync") with self._syncer(url, creds=creds) as syncer: # XXX could mark the critical section here... try: + log.msg('syncer sync...') res = syncer.sync(autocreate=autocreate, defer_decryption=defer_decryption) @@ -590,6 +653,9 @@ class SQLCipherU1DBSync(object): """ Interrupt all ongoing syncs. """ + self._defer_to_sync_threadpool(self._stop_sync) + + def _stop_sync(self): for url in self._syncers: _, syncer = self._syncers[url] syncer.stop() @@ -604,13 +670,13 @@ class SQLCipherU1DBSync(object): Because of that, this method blocks until the syncing lock can be acquired. """ - with self.syncing_lock[self.replica_uid]: + with self.syncing_lock[self._path]: syncer = self._get_syncer(url, creds=creds) yield syncer @property def syncing(self): - lock = self.syncing_lock[self.replica_uid] + lock = self.syncing_lock[self._path] acquired_lock = lock.acquire(False) if acquired_lock is False: return True @@ -640,7 +706,8 @@ class SQLCipherU1DBSync(object): syncer = SoledadSynchronizer( self, SoledadSyncTarget(url, - self.replica_uid, + # XXX is the replica_uid ready? + self._replica_uid, creds=creds, crypto=self._crypto, sync_db=self._sync_db, @@ -689,6 +756,14 @@ class SQLCipherU1DBSync(object): # XXX this SHOULD BE a callback return self._get_generation() + def finalClose(self): + """ + This should only be called by the shutdown trigger. + """ + self.shutdownID = None + self._sync_threadpool.stop() + self.running = False + def close(self): """ Close the syncer and syncdb orderly @@ -718,6 +793,36 @@ class SQLCipherU1DBSync(object): self.sync_queue = None +class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase): + """ + A very simple wrapper for u1db around sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ + + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self._factory = u1db.Document + + +class SoledadSQLCipherWrapper(SQLCipherDatabase): + """ + A wrapper for u1db that uses the Soledad-extended sqlcipher backend. + + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self.set_document_factory(soledad_doc_factory) + self._prime_replica_uid() + + def _assert_db_is_encrypted(opts): """ Assert that the sqlcipher file contains an encrypted database. |