summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-10-09 01:55:58 +0200
committerKali Kaneko <kali@leap.se>2015-02-11 14:03:17 -0400
commit71d0ba384b16e5a1d9cfd4ee2b046ff6957f9b4e (patch)
treeffe50302751853a0865c0a00b7929bb3d763e836
parent1ae8f27c622034dc9524dab4b971bf0828966dd1 (diff)
working sync-threadpool
* Completed mapping of async dbpool * Fixed shared db initialization. Stuff To Be Fixed yet: [ ] All inserts have to be done from the sync threadpool. Right now we're reusing the connection from multiple threads in the syncer. I'm assuming the writes are automatically locking the file at the sqlite level, so this shouldn't pose a problem. [ ] Correctly handle the multiprocessing pool, and the callback execution.
-rw-r--r--client/src/leap/soledad/client/adbapi.py44
-rw-r--r--client/src/leap/soledad/client/api.py72
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py121
3 files changed, 164 insertions, 73 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 60d9e195..733fce23 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -24,11 +24,9 @@ import sys
from functools import partial
-import u1db
-from u1db.backends import sqlite_backend
-
from twisted.enterprise import adbapi
from twisted.python import log
+from zope.proxy import ProxyBase, setProxiedObject
from leap.soledad.client import sqlcipher as soledad_sqlcipher
@@ -46,39 +44,9 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
check_same_thread=False, cp_openfun=openfun)
-class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
- """
- A very simple wrapper for u1db around sqlcipher backend.
-
- Instead of initializing the database on the fly, it just uses an existing
- connection that is passed to it in the initializer.
- """
-
- def __init__(self, conn):
- self._db_handle = conn
- self._real_replica_uid = None
- self._ensure_schema()
- self._factory = u1db.Document
-
-
-class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase):
- """
- A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
-
- Instead of initializing the database on the fly, it just uses an existing
- connection that is passed to it in the initializer.
- """
- def __init__(self, conn):
- self._db_handle = conn
- self._real_replica_uid = None
- self._ensure_schema()
- self.set_document_factory(soledad_sqlcipher.soledad_doc_factory)
- self._prime_replica_uid()
-
-
class U1DBConnection(adbapi.Connection):
- u1db_wrapper = SoledadSQLCipherWrapper
+ u1db_wrapper = soledad_sqlcipher.SoledadSQLCipherWrapper
def __init__(self, pool, init_u1db=False):
self.init_u1db = init_u1db
@@ -120,6 +88,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
# all u1db connections, hashed by thread-id
self.u1dbconnections = {}
+ # The replica uid, primed by the connections on init.
+ self.replica_uid = ProxyBase(None)
+
def runU1DBQuery(self, meth, *args, **kw):
meth = "u1db_%s" % meth
return self.runInteraction(self._runU1DBQuery, meth, *args, **kw)
@@ -133,6 +104,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
u1db = self.u1dbconnections.get(tid)
conn = self.connectionFactory(self, init_u1db=not bool(u1db))
+ if self.replica_uid is None:
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+ print "GOT REPLICA UID IN DBPOOL", self.replica_uid
+
if u1db is None:
self.u1dbconnections[tid] = conn._u1db
else:
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 493f6c1d..ff6257b2 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -158,10 +158,10 @@ class Soledad(object):
# store config params
self._uuid = uuid
self._passphrase = passphrase
- self._secrets_path = secrets_path
self._local_db_path = local_db_path
self._server_url = server_url
self._defer_encryption = defer_encryption
+ self._secrets_path = None
self.shared_db = None
@@ -176,6 +176,8 @@ class Soledad(object):
self._init_config_with_defaults()
self._init_working_dirs()
+ self._secrets_path = secrets_path
+
# Initialize shared recovery database
self.init_shared_db(server_url, uuid, self._creds)
@@ -193,13 +195,12 @@ class Soledad(object):
Initialize configuration using default values for missing params.
"""
soledad_assert_type(self._passphrase, unicode)
- initialize = lambda attr, val: attr is None and setattr(attr, val)
+ initialize = lambda attr, val: getattr(
+ self, attr, None) is None and setattr(self, attr, val)
- # initialize secrets_path
- initialize(self._secrets_path, os.path.join(
+ initialize("_secrets_path", os.path.join(
self.default_prefix, self.secrets_file_name))
- # initialize local_db_path
- initialize(self._local_db_path, os.path.join(
+ initialize("_local_db_path", os.path.join(
self.default_prefix, self.local_db_file_name))
# initialize server_url
soledad_assert(self._server_url is not None,
@@ -218,8 +219,8 @@ class Soledad(object):
def _init_secrets(self):
self._secrets = SoledadSecrets(
- self.uuid, self.passphrase, self.secrets_path,
- self._shared_db, self._crypto)
+ self.uuid, self._passphrase, self._secrets_path,
+ self.shared_db, self._crypto)
self._secrets.bootstrap()
def _init_u1db_sqlcipher_backend(self):
@@ -249,8 +250,11 @@ class Soledad(object):
self._dbpool = adbapi.getConnectionPool(opts)
def _init_u1db_syncer(self):
+ replica_uid = self._dbpool.replica_uid
+ print "replica UID (syncer init)", replica_uid
self._dbsyncer = SQLCipherU1DBSync(
- self._soledad_opts, self._crypto, self._defer_encryption)
+ self._soledad_opts, self._crypto, replica_uid,
+ self._defer_encryption)
#
# Closing methods
@@ -269,6 +273,9 @@ class Soledad(object):
# ILocalStorage
#
+ def _defer(self, meth, *args, **kw):
+ return self._dbpool.runU1DBQuery(meth, *args, **kw)
+
def put_doc(self, doc):
"""
============================== WARNING ==============================
@@ -282,58 +289,59 @@ class Soledad(object):
# Isn't it better to defend ourselves from the mutability, to avoid
# nasty surprises?
doc.content = self._convert_to_unicode(doc.content)
- return self._dbpool.put_doc(doc)
+ return self._defer("put_doc", doc)
def delete_doc(self, doc):
# XXX what does this do when fired???
- return self._dbpool.delete_doc(doc)
+ return self._defer("delete_doc", doc)
def get_doc(self, doc_id, include_deleted=False):
- return self._dbpool.get_doc(doc_id, include_deleted=include_deleted)
+ return self._defer(
+ "get_doc", doc_id, include_deleted=include_deleted)
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- return self._dbpool.get_docs(doc_ids,
- check_for_conflicts=check_for_conflicts,
- include_deleted=include_deleted)
+ def get_docs(
+ self, doc_ids, check_for_conflicts=True, include_deleted=False):
+ return self._defer(
+ "get_docs", doc_ids, check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
def get_all_docs(self, include_deleted=False):
- return self._dbpool.get_all_docs(include_deleted)
+ return self._defer("get_all_docs", include_deleted)
def create_doc(self, content, doc_id=None):
- return self._dbpool.create_doc(
- _convert_to_unicode(content), doc_id=doc_id)
+ return self._defer(
+ "create_doc", _convert_to_unicode(content), doc_id=doc_id)
def create_doc_from_json(self, json, doc_id=None):
- return self._dbpool.create_doc_from_json(json, doc_id=doc_id)
+ return self._defer("create_doc_from_json", json, doc_id=doc_id)
def create_index(self, index_name, *index_expressions):
- return self._dbpool.create_index(index_name, *index_expressions)
+ return self._defer("create_index", index_name, *index_expressions)
def delete_index(self, index_name):
- return self._dbpool.delete_index(index_name)
+ return self._defer("delete_index", index_name)
def list_indexes(self):
- return self._dbpool.list_indexes()
+ return self._defer("list_indexes")
def get_from_index(self, index_name, *key_values):
- return self._dbpool.get_from_index(index_name, *key_values)
+ return self._defer("get_from_index", index_name, *key_values)
def get_count_from_index(self, index_name, *key_values):
- return self._dbpool.get_count_from_index(index_name, *key_values)
+ return self._defer("get_count_from_index", index_name, *key_values)
def get_range_from_index(self, index_name, start_value, end_value):
- return self._dbpool.get_range_from_index(
- index_name, start_value, end_value)
+ return self._defer(
+ "get_range_from_index", index_name, start_value, end_value)
def get_index_keys(self, index_name):
- return self._dbpool.get_index_keys(index_name)
+ return self._defer("get_index_keys", index_name)
def get_doc_conflicts(self, doc_id):
- return self._dbpool.get_doc_conflicts(doc_id)
+ return self._defer("get_doc_conflicts", doc_id)
def resolve_doc(self, doc, conflicted_doc_revs):
- return self._dbpool.resolve_doc(doc, conflicted_doc_revs)
+ return self._defer("resolve_doc", doc, conflicted_doc_revs)
def _get_local_db_path(self):
return self._local_db_path
@@ -460,6 +468,8 @@ class Soledad(object):
#
def init_shared_db(self, server_url, uuid, creds):
+ # XXX should assert that server_url begins with https
+ # Otherwise u1db target will fail.
shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME)
self.shared_db = SoledadSharedDatabase.open_database(
shared_db_url,
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index c645bb8d..c8e14176 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -55,10 +55,14 @@ from httplib import CannotSendRequest
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
+import u1db
+
+from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
+from twisted.python import log
from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
@@ -77,7 +81,7 @@ logger = logging.getLogger(__name__)
sqlite_backend.dbapi2 = sqlcipher_dbapi2
-def initialize_sqlcipher_db(opts, on_init=None):
+def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
"""
Initialize a SQLCipher database.
@@ -97,7 +101,7 @@ def initialize_sqlcipher_db(opts, on_init=None):
raise u1db_errors.DatabaseDoesNotExist()
conn = sqlcipher_dbapi2.connect(
- opts.path)
+ opts.path, check_same_thread=check_same_thread)
set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
@@ -241,7 +245,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
self._real_replica_uid = None
self._get_replica_uid()
- print "REPLICA UID --->", self._real_replica_uid
def _extra_schema_init(self, c):
"""
@@ -402,7 +405,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self.close()
-class SQLCipherU1DBSync(object):
+class SQLCipherU1DBSync(SQLCipherDatabase):
_sync_loop = None
_sync_enc_pool = None
@@ -435,7 +438,13 @@ class SQLCipherU1DBSync(object):
def __init__(self, opts, soledad_crypto, replica_uid,
defer_encryption=False):
+ self._opts = opts
+ self._path = opts.path
self._crypto = soledad_crypto
+ self.__replica_uid = replica_uid
+
+ print "REPLICA UID (u1dbsync init)", replica_uid
+
self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_db_write_lock = None
@@ -453,9 +462,17 @@ class SQLCipherU1DBSync(object):
self._sync_db_write_lock = threading.Lock()
self.sync_queue = multiprocessing.Queue()
+ self.running = False
self._sync_threadpool = None
self._initialize_sync_threadpool()
+ self._reactor = reactor
+ self._reactor.callWhenRunning(self._start)
+
+ self.ready = False
+ self._db_handle = None
+ self._initialize_syncer_main_db()
+
if defer_encryption:
self._initialize_sync_db()
@@ -476,6 +493,40 @@ class SQLCipherU1DBSync(object):
self._sync_loop = LoopingCall(self._encrypt_syncing_docs),
self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
+ self.shutdownID = None
+
+ @property
+ def _replica_uid(self):
+ return str(self.__replica_uid)
+
+ def _start(self):
+ if not self.running:
+ self._sync_threadpool.start()
+ self.shutdownID = self._reactor.addSystemEventTrigger(
+ 'during', 'shutdown', self.finalClose)
+ self.running = True
+
+ def _defer_to_sync_threadpool(self, meth, *args, **kwargs):
+ return deferToThreadPool(
+ self._reactor, self._sync_threadpool, meth, *args, **kwargs)
+
+ def _initialize_syncer_main_db(self):
+
+ def init_db():
+
+ # XXX DEBUG ---------------------------------------------
+ import thread
+ print "initializing in thread", thread.get_ident()
+ # XXX DEBUG ---------------------------------------------
+
+ self._db_handle = initialize_sqlcipher_db(
+ self._opts, check_same_thread=False)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+
+ return self._defer_to_sync_threadpool(init_db)
+
def _initialize_sync_threadpool(self):
"""
Initialize a ThreadPool with exactly one thread, that will be used to
@@ -556,9 +607,19 @@ class SQLCipherU1DBSync(object):
before the synchronisation was performed.
:rtype: deferred
"""
+ if not self.ready:
+ print "not ready yet..."
+ # XXX ---------------------------------------------------------
+ # This might happen because the database has not yet been
+ # initialized (it's deferred to the theadpool).
+ # A good strategy might involve to return a deferred that will
+ # callLater this same function after a timeout (deferLater)
+ # Might want to keep track of retries and cancel too.
+ # --------------------------------------------------------------
+ print "Syncing to...", url
kwargs = {'creds': creds, 'autocreate': autocreate,
'defer_decryption': defer_decryption}
- return deferToThreadPool(self._sync, url, **kwargs)
+ return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
res = None
@@ -568,9 +629,11 @@ class SQLCipherU1DBSync(object):
# TODO review, I think this is no longer needed with a 1-thread
# threadpool.
+ log.msg("in _sync")
with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
try:
+ log.msg('syncer sync...')
res = syncer.sync(autocreate=autocreate,
defer_decryption=defer_decryption)
@@ -590,6 +653,9 @@ class SQLCipherU1DBSync(object):
"""
Interrupt all ongoing syncs.
"""
+ self._defer_to_sync_threadpool(self._stop_sync)
+
+ def _stop_sync(self):
for url in self._syncers:
_, syncer = self._syncers[url]
syncer.stop()
@@ -604,13 +670,13 @@ class SQLCipherU1DBSync(object):
Because of that, this method blocks until the syncing lock can be
acquired.
"""
- with self.syncing_lock[self.replica_uid]:
+ with self.syncing_lock[self._path]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
@property
def syncing(self):
- lock = self.syncing_lock[self.replica_uid]
+ lock = self.syncing_lock[self._path]
acquired_lock = lock.acquire(False)
if acquired_lock is False:
return True
@@ -640,7 +706,8 @@ class SQLCipherU1DBSync(object):
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
- self.replica_uid,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
creds=creds,
crypto=self._crypto,
sync_db=self._sync_db,
@@ -689,6 +756,14 @@ class SQLCipherU1DBSync(object):
# XXX this SHOULD BE a callback
return self._get_generation()
+ def finalClose(self):
+ """
+ This should only be called by the shutdown trigger.
+ """
+ self.shutdownID = None
+ self._sync_threadpool.stop()
+ self.running = False
+
def close(self):
"""
Close the syncer and syncdb orderly
@@ -718,6 +793,36 @@ class SQLCipherU1DBSync(object):
self.sync_queue = None
+class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
+ """
+ A very simple wrapper for u1db around sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+ """
+
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self._factory = u1db.Document
+
+
+class SoledadSQLCipherWrapper(SQLCipherDatabase):
+ """
+ A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+ """
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+ self._prime_replica_uid()
+
+
def _assert_db_is_encrypted(opts):
"""
Assert that the sqlcipher file contains an encrypted database.