summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad')
-rw-r--r--client/src/leap/soledad/client/adbapi.py44
-rw-r--r--client/src/leap/soledad/client/api.py72
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py121
3 files changed, 164 insertions, 73 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py
index 60d9e195..733fce23 100644
--- a/client/src/leap/soledad/client/adbapi.py
+++ b/client/src/leap/soledad/client/adbapi.py
@@ -24,11 +24,9 @@ import sys
from functools import partial
-import u1db
-from u1db.backends import sqlite_backend
-
from twisted.enterprise import adbapi
from twisted.python import log
+from zope.proxy import ProxyBase, setProxiedObject
from leap.soledad.client import sqlcipher as soledad_sqlcipher
@@ -46,39 +44,9 @@ def getConnectionPool(opts, openfun=None, driver="pysqlcipher"):
check_same_thread=False, cp_openfun=openfun)
-class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
- """
- A very simple wrapper for u1db around sqlcipher backend.
-
- Instead of initializing the database on the fly, it just uses an existing
- connection that is passed to it in the initializer.
- """
-
- def __init__(self, conn):
- self._db_handle = conn
- self._real_replica_uid = None
- self._ensure_schema()
- self._factory = u1db.Document
-
-
-class SoledadSQLCipherWrapper(soledad_sqlcipher.SQLCipherDatabase):
- """
- A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
-
- Instead of initializing the database on the fly, it just uses an existing
- connection that is passed to it in the initializer.
- """
- def __init__(self, conn):
- self._db_handle = conn
- self._real_replica_uid = None
- self._ensure_schema()
- self.set_document_factory(soledad_sqlcipher.soledad_doc_factory)
- self._prime_replica_uid()
-
-
class U1DBConnection(adbapi.Connection):
- u1db_wrapper = SoledadSQLCipherWrapper
+ u1db_wrapper = soledad_sqlcipher.SoledadSQLCipherWrapper
def __init__(self, pool, init_u1db=False):
self.init_u1db = init_u1db
@@ -120,6 +88,9 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
# all u1db connections, hashed by thread-id
self.u1dbconnections = {}
+ # The replica uid, primed by the connections on init.
+ self.replica_uid = ProxyBase(None)
+
def runU1DBQuery(self, meth, *args, **kw):
meth = "u1db_%s" % meth
return self.runInteraction(self._runU1DBQuery, meth, *args, **kw)
@@ -133,6 +104,11 @@ class U1DBConnectionPool(adbapi.ConnectionPool):
u1db = self.u1dbconnections.get(tid)
conn = self.connectionFactory(self, init_u1db=not bool(u1db))
+ if self.replica_uid is None:
+ replica_uid = conn._u1db._real_replica_uid
+ setProxiedObject(self.replica_uid, replica_uid)
+ print "GOT REPLICA UID IN DBPOOL", self.replica_uid
+
if u1db is None:
self.u1dbconnections[tid] = conn._u1db
else:
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 493f6c1d..ff6257b2 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -158,10 +158,10 @@ class Soledad(object):
# store config params
self._uuid = uuid
self._passphrase = passphrase
- self._secrets_path = secrets_path
self._local_db_path = local_db_path
self._server_url = server_url
self._defer_encryption = defer_encryption
+ self._secrets_path = None
self.shared_db = None
@@ -176,6 +176,8 @@ class Soledad(object):
self._init_config_with_defaults()
self._init_working_dirs()
+ self._secrets_path = secrets_path
+
# Initialize shared recovery database
self.init_shared_db(server_url, uuid, self._creds)
@@ -193,13 +195,12 @@ class Soledad(object):
Initialize configuration using default values for missing params.
"""
soledad_assert_type(self._passphrase, unicode)
- initialize = lambda attr, val: attr is None and setattr(attr, val)
+ initialize = lambda attr, val: getattr(
+ self, attr, None) is None and setattr(self, attr, val)
- # initialize secrets_path
- initialize(self._secrets_path, os.path.join(
+ initialize("_secrets_path", os.path.join(
self.default_prefix, self.secrets_file_name))
- # initialize local_db_path
- initialize(self._local_db_path, os.path.join(
+ initialize("_local_db_path", os.path.join(
self.default_prefix, self.local_db_file_name))
# initialize server_url
soledad_assert(self._server_url is not None,
@@ -218,8 +219,8 @@ class Soledad(object):
def _init_secrets(self):
self._secrets = SoledadSecrets(
- self.uuid, self.passphrase, self.secrets_path,
- self._shared_db, self._crypto)
+ self.uuid, self._passphrase, self._secrets_path,
+ self.shared_db, self._crypto)
self._secrets.bootstrap()
def _init_u1db_sqlcipher_backend(self):
@@ -249,8 +250,11 @@ class Soledad(object):
self._dbpool = adbapi.getConnectionPool(opts)
def _init_u1db_syncer(self):
+ replica_uid = self._dbpool.replica_uid
+ print "replica UID (syncer init)", replica_uid
self._dbsyncer = SQLCipherU1DBSync(
- self._soledad_opts, self._crypto, self._defer_encryption)
+ self._soledad_opts, self._crypto, replica_uid,
+ self._defer_encryption)
#
# Closing methods
@@ -269,6 +273,9 @@ class Soledad(object):
# ILocalStorage
#
+ def _defer(self, meth, *args, **kw):
+ return self._dbpool.runU1DBQuery(meth, *args, **kw)
+
def put_doc(self, doc):
"""
============================== WARNING ==============================
@@ -282,58 +289,59 @@ class Soledad(object):
# Isn't it better to defend ourselves from the mutability, to avoid
# nasty surprises?
doc.content = self._convert_to_unicode(doc.content)
- return self._dbpool.put_doc(doc)
+ return self._defer("put_doc", doc)
def delete_doc(self, doc):
# XXX what does this do when fired???
- return self._dbpool.delete_doc(doc)
+ return self._defer("delete_doc", doc)
def get_doc(self, doc_id, include_deleted=False):
- return self._dbpool.get_doc(doc_id, include_deleted=include_deleted)
+ return self._defer(
+ "get_doc", doc_id, include_deleted=include_deleted)
- def get_docs(self, doc_ids, check_for_conflicts=True,
- include_deleted=False):
- return self._dbpool.get_docs(doc_ids,
- check_for_conflicts=check_for_conflicts,
- include_deleted=include_deleted)
+ def get_docs(
+ self, doc_ids, check_for_conflicts=True, include_deleted=False):
+ return self._defer(
+ "get_docs", doc_ids, check_for_conflicts=check_for_conflicts,
+ include_deleted=include_deleted)
def get_all_docs(self, include_deleted=False):
- return self._dbpool.get_all_docs(include_deleted)
+ return self._defer("get_all_docs", include_deleted)
def create_doc(self, content, doc_id=None):
- return self._dbpool.create_doc(
- _convert_to_unicode(content), doc_id=doc_id)
+ return self._defer(
+ "create_doc", _convert_to_unicode(content), doc_id=doc_id)
def create_doc_from_json(self, json, doc_id=None):
- return self._dbpool.create_doc_from_json(json, doc_id=doc_id)
+ return self._defer("create_doc_from_json", json, doc_id=doc_id)
def create_index(self, index_name, *index_expressions):
- return self._dbpool.create_index(index_name, *index_expressions)
+ return self._defer("create_index", index_name, *index_expressions)
def delete_index(self, index_name):
- return self._dbpool.delete_index(index_name)
+ return self._defer("delete_index", index_name)
def list_indexes(self):
- return self._dbpool.list_indexes()
+ return self._defer("list_indexes")
def get_from_index(self, index_name, *key_values):
- return self._dbpool.get_from_index(index_name, *key_values)
+ return self._defer("get_from_index", index_name, *key_values)
def get_count_from_index(self, index_name, *key_values):
- return self._dbpool.get_count_from_index(index_name, *key_values)
+ return self._defer("get_count_from_index", index_name, *key_values)
def get_range_from_index(self, index_name, start_value, end_value):
- return self._dbpool.get_range_from_index(
- index_name, start_value, end_value)
+ return self._defer(
+ "get_range_from_index", index_name, start_value, end_value)
def get_index_keys(self, index_name):
- return self._dbpool.get_index_keys(index_name)
+ return self._defer("get_index_keys", index_name)
def get_doc_conflicts(self, doc_id):
- return self._dbpool.get_doc_conflicts(doc_id)
+ return self._defer("get_doc_conflicts", doc_id)
def resolve_doc(self, doc, conflicted_doc_revs):
- return self._dbpool.resolve_doc(doc, conflicted_doc_revs)
+ return self._defer("resolve_doc", doc, conflicted_doc_revs)
def _get_local_db_path(self):
return self._local_db_path
@@ -460,6 +468,8 @@ class Soledad(object):
#
def init_shared_db(self, server_url, uuid, creds):
+ # XXX should assert that server_url begins with https
+ # Otherwise u1db target will fail.
shared_db_url = urlparse.urljoin(server_url, SHARED_DB_NAME)
self.shared_db = SoledadSharedDatabase.open_database(
shared_db_url,
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index c645bb8d..c8e14176 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -55,10 +55,14 @@ from httplib import CannotSendRequest
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from u1db.backends import sqlite_backend
from u1db import errors as u1db_errors
+import u1db
+
+from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
+from twisted.python import log
from leap.soledad.client import crypto
from leap.soledad.client.target import SoledadSyncTarget
@@ -77,7 +81,7 @@ logger = logging.getLogger(__name__)
sqlite_backend.dbapi2 = sqlcipher_dbapi2
-def initialize_sqlcipher_db(opts, on_init=None):
+def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
"""
Initialize a SQLCipher database.
@@ -97,7 +101,7 @@ def initialize_sqlcipher_db(opts, on_init=None):
raise u1db_errors.DatabaseDoesNotExist()
conn = sqlcipher_dbapi2.connect(
- opts.path)
+ opts.path, check_same_thread=check_same_thread)
set_init_pragmas(conn, opts, extra_queries=on_init)
return conn
@@ -241,7 +245,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
"""
self._real_replica_uid = None
self._get_replica_uid()
- print "REPLICA UID --->", self._real_replica_uid
def _extra_schema_init(self, c):
"""
@@ -402,7 +405,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self.close()
-class SQLCipherU1DBSync(object):
+class SQLCipherU1DBSync(SQLCipherDatabase):
_sync_loop = None
_sync_enc_pool = None
@@ -435,7 +438,13 @@ class SQLCipherU1DBSync(object):
def __init__(self, opts, soledad_crypto, replica_uid,
defer_encryption=False):
+ self._opts = opts
+ self._path = opts.path
self._crypto = soledad_crypto
+ self.__replica_uid = replica_uid
+
+ print "REPLICA UID (u1dbsync init)", replica_uid
+
self._sync_db_key = opts.sync_db_key
self._sync_db = None
self._sync_db_write_lock = None
@@ -453,9 +462,17 @@ class SQLCipherU1DBSync(object):
self._sync_db_write_lock = threading.Lock()
self.sync_queue = multiprocessing.Queue()
+ self.running = False
self._sync_threadpool = None
self._initialize_sync_threadpool()
+ self._reactor = reactor
+ self._reactor.callWhenRunning(self._start)
+
+ self.ready = False
+ self._db_handle = None
+ self._initialize_syncer_main_db()
+
if defer_encryption:
self._initialize_sync_db()
@@ -476,6 +493,40 @@ class SQLCipherU1DBSync(object):
self._sync_loop = LoopingCall(self._encrypt_syncing_docs),
self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)
+ self.shutdownID = None
+
+ @property
+ def _replica_uid(self):
+ return str(self.__replica_uid)
+
+ def _start(self):
+ if not self.running:
+ self._sync_threadpool.start()
+ self.shutdownID = self._reactor.addSystemEventTrigger(
+ 'during', 'shutdown', self.finalClose)
+ self.running = True
+
+ def _defer_to_sync_threadpool(self, meth, *args, **kwargs):
+ return deferToThreadPool(
+ self._reactor, self._sync_threadpool, meth, *args, **kwargs)
+
+ def _initialize_syncer_main_db(self):
+
+ def init_db():
+
+ # XXX DEBUG ---------------------------------------------
+ import thread
+ print "initializing in thread", thread.get_ident()
+ # XXX DEBUG ---------------------------------------------
+
+ self._db_handle = initialize_sqlcipher_db(
+ self._opts, check_same_thread=False)
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+
+ return self._defer_to_sync_threadpool(init_db)
+
def _initialize_sync_threadpool(self):
"""
Initialize a ThreadPool with exactly one thread, that will be used to
@@ -556,9 +607,19 @@ class SQLCipherU1DBSync(object):
before the synchronisation was performed.
:rtype: deferred
"""
+ if not self.ready:
+ print "not ready yet..."
+ # XXX ---------------------------------------------------------
+ # This might happen because the database has not yet been
+ # initialized (it's deferred to the theadpool).
+ # A good strategy might involve to return a deferred that will
+ # callLater this same function after a timeout (deferLater)
+ # Might want to keep track of retries and cancel too.
+ # --------------------------------------------------------------
+ print "Syncing to...", url
kwargs = {'creds': creds, 'autocreate': autocreate,
'defer_decryption': defer_decryption}
- return deferToThreadPool(self._sync, url, **kwargs)
+ return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
res = None
@@ -568,9 +629,11 @@ class SQLCipherU1DBSync(object):
# TODO review, I think this is no longer needed with a 1-thread
# threadpool.
+ log.msg("in _sync")
with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
try:
+ log.msg('syncer sync...')
res = syncer.sync(autocreate=autocreate,
defer_decryption=defer_decryption)
@@ -590,6 +653,9 @@ class SQLCipherU1DBSync(object):
"""
Interrupt all ongoing syncs.
"""
+ self._defer_to_sync_threadpool(self._stop_sync)
+
+ def _stop_sync(self):
for url in self._syncers:
_, syncer = self._syncers[url]
syncer.stop()
@@ -604,13 +670,13 @@ class SQLCipherU1DBSync(object):
Because of that, this method blocks until the syncing lock can be
acquired.
"""
- with self.syncing_lock[self.replica_uid]:
+ with self.syncing_lock[self._path]:
syncer = self._get_syncer(url, creds=creds)
yield syncer
@property
def syncing(self):
- lock = self.syncing_lock[self.replica_uid]
+ lock = self.syncing_lock[self._path]
acquired_lock = lock.acquire(False)
if acquired_lock is False:
return True
@@ -640,7 +706,8 @@ class SQLCipherU1DBSync(object):
syncer = SoledadSynchronizer(
self,
SoledadSyncTarget(url,
- self.replica_uid,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
creds=creds,
crypto=self._crypto,
sync_db=self._sync_db,
@@ -689,6 +756,14 @@ class SQLCipherU1DBSync(object):
# XXX this SHOULD BE a callback
return self._get_generation()
+ def finalClose(self):
+ """
+ This should only be called by the shutdown trigger.
+ """
+ self.shutdownID = None
+ self._sync_threadpool.stop()
+ self.running = False
+
def close(self):
"""
Close the syncer and syncdb orderly
@@ -718,6 +793,36 @@ class SQLCipherU1DBSync(object):
self.sync_queue = None
+class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
+ """
+ A very simple wrapper for u1db around sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+ """
+
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self._factory = u1db.Document
+
+
+class SoledadSQLCipherWrapper(SQLCipherDatabase):
+ """
+ A wrapper for u1db that uses the Soledad-extended sqlcipher backend.
+
+ Instead of initializing the database on the fly, it just uses an existing
+ connection that is passed to it in the initializer.
+ """
+ def __init__(self, conn):
+ self._db_handle = conn
+ self._real_replica_uid = None
+ self._ensure_schema()
+ self.set_document_factory(soledad_doc_factory)
+ self._prime_replica_uid()
+
+
def _assert_db_is_encrypted(opts):
"""
Assert that the sqlcipher file contains an encrypted database.