diff options
author | Kali Kaneko <kali@leap.se> | 2014-09-23 13:38:06 -0500 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2015-02-11 14:03:17 -0400 |
commit | e0f70a342deccbb53a6ea7215b3322388bb18461 (patch) | |
tree | bd8fb37de901b82ffba984ea24060ab0ae790913 /client/src | |
parent | 9c56adfd27e96c44c12ad5295c42e6b8d9bcad98 (diff) |
Refactor soledad api to use async db
* add examples and benchmarks
* remove autocommit mode, allow wal disabling
* lock initialization
* make api use async calls
Diffstat (limited to 'client/src')
-rw-r--r-- | client/src/leap/soledad/client/adbapi.py | 146 | ||||
-rw-r--r-- | client/src/leap/soledad/client/api.py | 323 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/README | 4 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/compare.txt | 8 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/manifest.phk | 50 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/plot-async-db.py | 45 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/run_benchmark.py | 28 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/use_adbapi.py | 103 | ||||
-rw-r--r-- | client/src/leap/soledad/client/examples/use_api.py | 67 | ||||
-rw-r--r-- | client/src/leap/soledad/client/mp_safe_db_TOREMOVE.py | 112 | ||||
-rw-r--r-- | client/src/leap/soledad/client/pragmas.py | 20 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 845 |
12 files changed, 990 insertions, 761 deletions
diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 730999a3..3b15509b 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# sqlcipher.py +# adbapi.py # Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify @@ -17,61 +17,135 @@ """ An asyncrhonous interface to soledad using sqlcipher backend. It uses twisted.enterprise.adbapi. - """ +import re import os import sys +from functools import partial + +import u1db +from u1db.backends import sqlite_backend + from twisted.enterprise import adbapi from twisted.python import log +from leap.soledad.client.sqlcipher import set_init_pragmas + + DEBUG_SQL = os.environ.get("LEAP_DEBUG_SQL") if DEBUG_SQL: log.startLogging(sys.stdout) -def getConnectionPool(db=None, key=None): - return SQLCipherConnectionPool( - "pysqlcipher.dbapi2", database=db, key=key, check_same_thread=False) +def getConnectionPool(opts, openfun=None, driver="pysqlcipher"): + if openfun is None and driver == "pysqlcipher": + openfun = partial(set_init_pragmas, opts=opts) + return U1DBConnectionPool( + "%s.dbapi2" % driver, database=opts.path, + check_same_thread=False, cp_openfun=openfun) -class SQLCipherConnectionPool(adbapi.ConnectionPool): +# XXX work in progress -------------------------------------------- - key = None - def connect(self): - """ - Return a database connection when one becomes available. +class U1DBSqliteWrapper(sqlite_backend.SQLitePartialExpandDatabase): + """ + A very simple wrapper around sqlcipher backend. - This method blocks and should be run in a thread from the internal - threadpool. Don't call this method directly from non-threaded code. - Using this method outside the external threadpool may exceed the - maximum number of connections in the pool. + Instead of initializing the database on the fly, it just uses an existing + connection that is passed to it in the initializer. + """ - :return: a database connection from the pool. - """ - self.noisy = DEBUG_SQL + def __init__(self, conn): + self._db_handle = conn + self._real_replica_uid = None + self._ensure_schema() + self._factory = u1db.Document - tid = self.threadID() - conn = self.connections.get(tid) - if self.key is None: - self.key = self.connkw.pop('key', None) +class U1DBConnection(adbapi.Connection): - if conn is None: - if self.noisy: - log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, - self.connargs or '', - self.connkw or '')) - conn = self.dbapi.connect(*self.connargs, **self.connkw) + u1db_wrapper = U1DBSqliteWrapper + + def __init__(self, pool, init_u1db=False): + self.init_u1db = init_u1db + adbapi.Connection.__init__(self, pool) + + def reconnect(self): + if self._connection is not None: + self._pool.disconnect(self._connection) + self._connection = self._pool.connect() + + if self.init_u1db: + self._u1db = self.u1db_wrapper(self._connection) + + def __getattr__(self, name): + if name.startswith('u1db_'): + meth = re.sub('^u1db_', '', name) + return getattr(self._u1db, meth) + else: + return getattr(self._connection, name) - # XXX we should hook here all OUR SOLEDAD pragmas ----- - conn.cursor().execute("PRAGMA key=%s" % self.key) - conn.commit() - # ----------------------------------------------------- - # XXX profit of openfun isntead??? - if self.openfun is not None: - self.openfun(conn) - self.connections[tid] = conn - return conn +class U1DBTransaction(adbapi.Transaction): + + def __getattr__(self, name): + if name.startswith('u1db_'): + meth = re.sub('^u1db_', '', name) + return getattr(self._connection._u1db, meth) + else: + return getattr(self._cursor, name) + + +class U1DBConnectionPool(adbapi.ConnectionPool): + + connectionFactory = U1DBConnection + transactionFactory = U1DBTransaction + + def __init__(self, *args, **kwargs): + adbapi.ConnectionPool.__init__(self, *args, **kwargs) + # all u1db connections, hashed by thread-id + self.u1dbconnections = {} + + def runU1DBQuery(self, meth, *args, **kw): + meth = "u1db_%s" % meth + return self.runInteraction(self._runU1DBQuery, meth, *args, **kw) + + def _runU1DBQuery(self, trans, meth, *args, **kw): + meth = getattr(trans, meth) + return meth(*args, **kw) + + def _runInteraction(self, interaction, *args, **kw): + tid = self.threadID() + u1db = self.u1dbconnections.get(tid) + conn = self.connectionFactory(self, init_u1db=not bool(u1db)) + + if u1db is None: + self.u1dbconnections[tid] = conn._u1db + else: + conn._u1db = u1db + + trans = self.transactionFactory(self, conn) + try: + result = interaction(trans, *args, **kw) + trans.close() + conn.commit() + return result + except: + excType, excValue, excTraceback = sys.exc_info() + try: + conn.rollback() + except: + log.err(None, "Rollback failed") + raise excType, excValue, excTraceback + + def finalClose(self): + self.shutdownID = None + self.threadpool.stop() + self.running = False + for conn in self.connections.values(): + self._close(conn) + for u1db in self.u1dbconnections.values(): + self._close(u1db) + self.connections.clear() diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index bfb6c703..703b9516 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -34,7 +34,6 @@ import socket import ssl import urlparse - try: import cchardet as chardet except ImportError: @@ -47,15 +46,14 @@ from leap.common.config import get_path_prefix from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type -from leap.soledad.common.document import SoledadDocument +from leap.soledad.client import adbapi from leap.soledad.client import events as soledad_events from leap.soledad.client.crypto import SoledadCrypto from leap.soledad.client.secrets import SoledadSecrets from leap.soledad.client.shared_db import SoledadSharedDatabase -from leap.soledad.client.sqlcipher import SQLCipherDatabase from leap.soledad.client.target import SoledadSyncTarget -from leap.soledad.client.sqlcipher import SQLCipherDB, SQLCipherOptions +from leap.soledad.client.sqlcipher import SQLCipherOptions logger = logging.getLogger(name=__name__) @@ -200,18 +198,19 @@ class Soledad(object): Initialize configuration using default values for missing params. """ soledad_assert_type(self._passphrase, unicode) + initialize = lambda attr, val: attr is None and setattr(attr, val) + # initialize secrets_path - if self._secrets_path is None: - self._secrets_path = os.path.join( - self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME) + initialize(self._secrets_path, os.path.join( + self.DEFAULT_PREFIX, self.STORAGE_SECRETS_FILE_NAME)) + # initialize local_db_path - if self._local_db_path is None: - self._local_db_path = os.path.join( - self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME) + initialize(self._local_db_path, os.path.join( + self.DEFAULT_PREFIX, self.LOCAL_DATABASE_FILE_NAME)) + # initialize server_url - soledad_assert( - self._server_url is not None, - 'Missing URL for Soledad server.') + soledad_assert(self._server_url is not None, + 'Missing URL for Soledad server.') # # initialization/destruction methods @@ -221,14 +220,13 @@ class Soledad(object): """ Bootstrap local Soledad instance. - :raise BootstrapSequenceError: Raised when the secret generation and - storage on server sequence has failed for some reason. + :raise BootstrapSequenceError: + Raised when the secret generation and storage on server sequence + has failed for some reason. """ - try: - self._secrets.bootstrap() - self._init_db() - except: - raise + self._secrets.bootstrap() + self._init_db() + # XXX initialize syncers? def _init_dirs(self): """ @@ -255,8 +253,9 @@ class Soledad(object): Initialize the U1DB SQLCipher database for local storage. Currently, Soledad uses the default SQLCipher cipher, i.e. - 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key and - uses the 'raw PRAGMA key' format to handle the key to SQLCipher. + 'aes-256-cbc'. We use scrypt to derive a 256-bit encryption key, + and internally the SQLCipherDatabase initialization uses the 'raw + PRAGMA key' format to handle the key to SQLCipher. """ tohex = binascii.b2a_hex # sqlcipher only accepts the hex version @@ -265,25 +264,28 @@ class Soledad(object): opts = SQLCipherOptions( self._local_db_path, key, - is_raw_key=True, - create=True, + is_raw_key=True, create=True, defer_encryption=self._defer_encryption, sync_db_key=sync_db_key, - crypto=self._crypto, # XXX add this - document_factory=SoledadDocument, ) - self._db = SQLCipherDB(opts) + self._dbpool = adbapi.getConnectionPool(opts) def close(self): """ Close underlying U1DB database. """ logger.debug("Closing soledad") - if hasattr(self, '_db') and isinstance( - self._db, - SQLCipherDatabase): - self._db.stop_sync() - self._db.close() + self._dbpool.close() + + # TODO close syncers >>>>>> + + #if hasattr(self, '_db') and isinstance( + #self._db, + #SQLCipherDatabase): + #self._db.close() +# + # XXX stop syncers + # self._db.stop_sync() @property def _shared_db(self): @@ -306,24 +308,29 @@ class Soledad(object): # def put_doc(self, doc): + # TODO what happens with this warning during the deferred life cycle? + # Isn't it better to defend ourselves from the mutability, to avoid + # nasty surprises? """ Update a document in the local encrypted database. ============================== WARNING ============================== This method converts the document's contents to unicode in-place. This - means that after calling C{put_doc(doc)}, the contents of the - document, i.e. C{doc.content}, might be different from before the + means that after calling `put_doc(doc)`, the contents of the + document, i.e. `doc.content`, might be different from before the call. ============================== WARNING ============================== :param doc: the document to update :type doc: SoledadDocument - :return: the new revision identifier for the document - :rtype: str + :return: + a deferred that will fire with the new revision identifier for + the document + :rtype: Deferred """ doc.content = self._convert_to_unicode(doc.content) - return self._db.put_doc(doc) + return self._dbpool.put_doc(doc) def delete_doc(self, doc): """ @@ -332,10 +339,12 @@ class Soledad(object): :param doc: the document to delete :type doc: SoledadDocument - :return: the new revision identifier for the document - :rtype: str + :return: + a deferred that will fire with ... + :rtype: Deferred """ - return self._db.delete_doc(doc) + # XXX what does this do when fired??? + return self._dbpool.delete_doc(doc) def get_doc(self, doc_id, include_deleted=False): """ @@ -343,15 +352,17 @@ class Soledad(object): :param doc_id: the unique document identifier :type doc_id: str - :param include_deleted: if True, deleted documents will be - returned with empty content; otherwise asking - for a deleted document will return None + :param include_deleted: + if True, deleted documents will be returned with empty content; + otherwise asking for a deleted document will return None :type include_deleted: bool - :return: the document object or None - :rtype: SoledadDocument + :return: + A deferred that will fire with the document object, containing a + SoledadDocument, or None if it could not be found + :rtype: Deferred """ - return self._db.get_doc(doc_id, include_deleted=include_deleted) + return self._dbpool.get_doc(doc_id, include_deleted=include_deleted) def get_docs(self, doc_ids, check_for_conflicts=True, include_deleted=False): @@ -364,11 +375,12 @@ class Soledad(object): be skipped, and 'None' will be returned instead of True/False :type check_for_conflicts: bool - :return: iterable giving the Document object for each document id - in matching doc_ids order. - :rtype: generator + :return: + A deferred that will fire with an iterable giving the Document + object for each document id in matching doc_ids order. + :rtype: Deferred """ - return self._db.get_docs( + return self._dbpool.get_docs( doc_ids, check_for_conflicts=check_for_conflicts, include_deleted=include_deleted) @@ -379,43 +391,13 @@ class Soledad(object): :param include_deleted: If set to True, deleted documents will be returned with empty content. Otherwise deleted documents will not be included in the results. - :return: (generation, [Document]) - The current generation of the database, followed by a list of - all the documents in the database. + :return: + A deferred that will fire with (generation, [Document]): that is, + the current generation of the database, followed by a list of all + the documents in the database. + :rtype: Deferred """ - return self._db.get_all_docs(include_deleted) - - def _convert_to_unicode(self, content): - """ - Converts content to unicode (or all the strings in content) - - NOTE: Even though this method supports any type, it will - currently ignore contents of lists, tuple or any other - iterable than dict. We don't need support for these at the - moment - - :param content: content to convert - :type content: object - - :rtype: object - """ - if isinstance(content, unicode): - return content - elif isinstance(content, str): - result = chardet.detect(content) - default = "utf-8" - encoding = result["encoding"] or default - try: - content = content.decode(encoding) - except UnicodeError as e: - logger.error("Unicode error: {0!r}. Using 'replace'".format(e)) - content = content.decode(encoding, 'replace') - return content - else: - if isinstance(content, dict): - for key in content.keys(): - content[key] = self._convert_to_unicode(content[key]) - return content + return self._dbpool.get_all_docs(include_deleted) def create_doc(self, content, doc_id=None): """ @@ -426,11 +408,13 @@ class Soledad(object): :param doc_id: an optional identifier specifying the document id :type doc_id: str - :return: the new document - :rtype: SoledadDocument + :return: + A deferred tht will fire with the new document (SoledadDocument + instance). + :rtype: Deferred """ - return self._db.create_doc( - self._convert_to_unicode(content), doc_id=doc_id) + return self._dbpool.create_doc( + _convert_to_unicode(content), doc_id=doc_id) def create_doc_from_json(self, json, doc_id=None): """ @@ -446,10 +430,12 @@ class Soledad(object): :type json: str :param doc_id: An optional identifier specifying the document id. :type doc_id: - :return: The new document - :rtype: SoledadDocument + :return: + A deferred that will fire with the new document (A SoledadDocument + instance) + :rtype: Deferred """ - return self._db.create_doc_from_json(json, doc_id=doc_id) + return self._dbpool.create_doc_from_json(json, doc_id=doc_id) def create_index(self, index_name, *index_expressions): """ @@ -462,8 +448,8 @@ class Soledad(object): :param index_name: A unique name which can be used as a key prefix :type index_name: str - :param index_expressions: index expressions defining the index - information. + :param index_expressions: + index expressions defining the index information. :type index_expressions: dict Examples: @@ -473,9 +459,7 @@ class Soledad(object): "number(fieldname, width)", "lower(fieldname)" """ - if self._db: - return self._db.create_index( - index_name, *index_expressions) + return self._dbpool.create_index(index_name, *index_expressions) def delete_index(self, index_name): """ @@ -484,8 +468,7 @@ class Soledad(object): :param index_name: The name of the index we are removing :type index_name: str """ - if self._db: - return self._db.delete_index(index_name) + return self._dbpool.delete_index(index_name) def list_indexes(self): """ @@ -494,8 +477,7 @@ class Soledad(object): :return: A list of [('index-name', ['field', 'field2'])] definitions. :rtype: list """ - if self._db: - return self._db.list_indexes() + return self._dbpool.list_indexes() def get_from_index(self, index_name, *key_values): """ @@ -517,8 +499,7 @@ class Soledad(object): :return: List of [Document] :rtype: list """ - if self._db: - return self._db.get_from_index(index_name, *key_values) + return self._dbpool.get_from_index(index_name, *key_values) def get_count_from_index(self, index_name, *key_values): """ @@ -534,8 +515,7 @@ class Soledad(object): :return: count. :rtype: int """ - if self._db: - return self._db.get_count_from_index(index_name, *key_values) + return self._dbpool.get_count_from_index(index_name, *key_values) def get_range_from_index(self, index_name, start_value, end_value): """ @@ -561,12 +541,11 @@ class Soledad(object): range. eg, if you have an index with 3 fields then you would have: (val1, val2, val3) :type end_values: tuple - :return: List of [Document] - :rtype: list + :return: A deferred that will fire with a list of [Document] + :rtype: Deferred """ - if self._db: - return self._db.get_range_from_index( - index_name, start_value, end_value) + return self._dbpool.get_range_from_index( + index_name, start_value, end_value) def get_index_keys(self, index_name): """ @@ -574,11 +553,11 @@ class Soledad(object): :param index_name: The index to query :type index_name: str - :return: [] A list of tuples of indexed keys. - :rtype: list + :return: + A deferred that will fire with a list of tuples of indexed keys. + :rtype: Deferred """ - if self._db: - return self._db.get_index_keys(index_name) + return self._dbpool.get_index_keys(index_name) def get_doc_conflicts(self, doc_id): """ @@ -587,11 +566,12 @@ class Soledad(object): :param doc_id: the document id :type doc_id: str - :return: a list of the document entries that are conflicted - :rtype: list + :return: + A deferred that will fire with a list of the document entries that + are conflicted. + :rtype: Deferred """ - if self._db: - return self._db.get_doc_conflicts(doc_id) + return self._dbpool.get_doc_conflicts(doc_id) def resolve_doc(self, doc, conflicted_doc_revs): """ @@ -599,12 +579,18 @@ class Soledad(object): :param doc: a document with the new content to be inserted. :type doc: SoledadDocument - :param conflicted_doc_revs: a list of revisions that the new content - supersedes. + :param conflicted_doc_revs: + A deferred that will fire with a list of revisions that the new + content supersedes. :type conflicted_doc_revs: list """ - if self._db: - return self._db.resolve_doc(doc, conflicted_doc_revs) + return self._dbpool.resolve_doc(doc, conflicted_doc_revs) + + # + # Sync API + # + + # TODO have interfaces, and let it implement it. def sync(self, defer_decryption=True): """ @@ -616,33 +602,38 @@ class Soledad(object): :param url: the url of the target replica to sync with :type url: str - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. + :param defer_decryption: + Whether to defer the decryption process using the intermediate + database. If False, decryption will be done inline. :type defer_decryption: bool - :return: The local generation before the synchronisation was - performed. + :return: + A deferred that will fire with the local generation before the + synchronisation was performed. :rtype: str """ - if self._db: - try: - local_gen = self._db.sync( - urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=False, - defer_decryption=defer_decryption) - soledad_events.signal( - soledad_events.SOLEDAD_DONE_DATA_SYNC, self._uuid) - return local_gen - except Exception as e: - logger.error("Soledad exception when syncing: %s" % str(e)) + # TODO this needs work. + # Should: + # (1) Defer to the syncer pool + # (2) Return a deferred (the deferToThreadpool can be good) + # (3) Add the callback for signaling the event + # (4) Let the local gen be returned from the thread + try: + local_gen = self._dbsyncer.sync( + urlparse.urljoin(self.server_url, 'user-%s' % self._uuid), + creds=self._creds, autocreate=False, + defer_decryption=defer_decryption) + soledad_events.signal( + soledad_events.SOLEDAD_DONE_DATA_SYNC, self._uuid) + return local_gen + except Exception as e: + logger.error("Soledad exception when syncing: %s" % str(e)) def stop_sync(self): """ Stop the current syncing process. """ - if self._db: - self._db.stop_sync() + self._dbsyncer.stop_sync() def need_sync(self, url): """ @@ -654,12 +645,18 @@ class Soledad(object): :return: Whether remote replica and local replica differ. :rtype: bool """ + # XXX pass the get_replica_uid ------------------------ + # From where? initialize with that? + replica_uid = self._db._get_replica_uid() target = SoledadSyncTarget( - url, self._db._get_replica_uid(), creds=self._creds, - crypto=self._crypto) - info = target.get_sync_info(self._db._get_replica_uid()) + url, replica_uid, creds=self._creds, crypto=self._crypto) + + generation = self._db._get_generation() + # XXX better unpack it? + info = target.get_sync_info(replica_uid) + # compare source generation with target's last known source generation - if self._db._get_generation() != info[4]: + if generation != info[4]: soledad_events.signal( soledad_events.SOLEDAD_NEW_DATA_TO_SYNC, self._uuid) return True @@ -670,7 +667,7 @@ class Soledad(object): """ Property, True if the syncer is syncing. """ - return self._db.syncing + return self._dbsyncer.syncing def _set_token(self, token): """ @@ -781,6 +778,39 @@ class Soledad(object): self._secrets.change_passphrase(new_passphrase) +def _convert_to_unicode(content): + """ + Convert content to unicode (or all the strings in content) + + NOTE: Even though this method supports any type, it will + currently ignore contents of lists, tuple or any other + iterable than dict. We don't need support for these at the + moment + + :param content: content to convert + :type content: object + + :rtype: object + """ + if isinstance(content, unicode): + return content + elif isinstance(content, str): + result = chardet.detect(content) + default = "utf-8" + encoding = result["encoding"] or default + try: + content = content.decode(encoding) + except UnicodeError as e: + logger.error("Unicode error: {0!r}. Using 'replace'".format(e)) + content = content.decode(encoding, 'replace') + return content + else: + if isinstance(content, dict): + for key in content.keys(): + content[key] = _convert_to_unicode(content[key]) + return content + + # ---------------------------------------------------------------------------- # Monkey patching u1db to be able to provide a custom SSL cert # ---------------------------------------------------------------------------- @@ -819,4 +849,3 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): old__VerifiedHTTPSConnection = http_client._VerifiedHTTPSConnection http_client._VerifiedHTTPSConnection = VerifiedHTTPSConnection - diff --git a/client/src/leap/soledad/client/examples/README b/client/src/leap/soledad/client/examples/README new file mode 100644 index 00000000..3aed8377 --- /dev/null +++ b/client/src/leap/soledad/client/examples/README @@ -0,0 +1,4 @@ +Right now, you can find here both an example of use +and the benchmarking scripts. +TODO move benchmark scripts to root scripts/ folder, +and leave here only a minimal example. diff --git a/client/src/leap/soledad/client/examples/compare.txt b/client/src/leap/soledad/client/examples/compare.txt new file mode 100644 index 00000000..19a1325a --- /dev/null +++ b/client/src/leap/soledad/client/examples/compare.txt @@ -0,0 +1,8 @@ +TIMES=100 TMPDIR=/media/sdb5/leap python use_adbapi.py 1.34s user 0.16s system 53% cpu 2.832 total +TIMES=100 TMPDIR=/media/sdb5/leap python use_api.py 1.22s user 0.14s system 62% cpu 2.181 total + +TIMES=1000 TMPDIR=/media/sdb5/leap python use_api.py 2.18s user 0.34s system 27% cpu 9.213 total +TIMES=1000 TMPDIR=/media/sdb5/leap python use_adbapi.py 2.40s user 0.34s system 39% cpu 7.004 total + +TIMES=5000 TMPDIR=/media/sdb5/leap python use_api.py 6.63s user 1.27s system 13% cpu 57.882 total +TIMES=5000 TMPDIR=/media/sdb5/leap python use_adbapi.py 6.84s user 1.26s system 36% cpu 22.367 total diff --git a/client/src/leap/soledad/client/examples/manifest.phk b/client/src/leap/soledad/client/examples/manifest.phk new file mode 100644 index 00000000..2c86c07d --- /dev/null +++ b/client/src/leap/soledad/client/examples/manifest.phk @@ -0,0 +1,50 @@ +The Hacker's Manifesto + +The Hacker's Manifesto +by: The Mentor + +Another one got caught today, it's all over the papers. "Teenager +Arrested in Computer Crime Scandal", "Hacker Arrested after Bank +Tampering." "Damn kids. They're all alike." But did you, in your +three-piece psychology and 1950's technobrain, ever take a look behind +the eyes of the hacker? Did you ever wonder what made him tick, what +forces shaped him, what may have molded him? I am a hacker, enter my +world. Mine is a world that begins with school. I'm smarter than most of +the other kids, this crap they teach us bores me. "Damn underachiever. +They're all alike." I'm in junior high or high school. I've listened to +teachers explain for the fifteenth time how to reduce a fraction. I +understand it. "No, Ms. Smith, I didn't show my work. I did it in +my head." "Damn kid. Probably copied it. They're all alike." I made a +discovery today. I found a computer. Wait a second, this is cool. It does +what I want it to. If it makes a mistake, it's because I screwed it up. +Not because it doesn't like me, or feels threatened by me, or thinks I'm +a smart ass, or doesn't like teaching and shouldn't be here. Damn kid. +All he does is play games. They're all alike. And then it happened... a +door opened to a world... rushing through the phone line like heroin +through an addict's veins, an electronic pulse is sent out, a refuge from +the day-to-day incompetencies is sought... a board is found. "This is +it... this is where I belong..." I know everyone here... even if I've +never met them, never talked to them, may never hear from them again... I +know you all... Damn kid. Tying up the phone line again. They're all +alike... You bet your ass we're all alike... we've been spoon-fed baby +food at school when we hungered for steak... the bits of meat that you +did let slip through were pre-chewed and tasteless. We've been dominated +by sadists, or ignored by the apathetic. The few that had something to +teach found us willing pupils, but those few are like drops of water in +the desert. This is our world now... the world of the electron and the +switch, the beauty of the baud. We make use of a service already existing +without paying for what could be dirt-cheap if it wasn't run by +profiteering gluttons, and you call us criminals. We explore... and you +call us criminals. We seek after knowledge... and you call us criminals. +We exist without skin color, without nationality, without religious +bias... and you call us criminals. You build atomic bombs, you wage wars, +you murder, cheat, and lie to us and try to make us believe it's for our +own good, yet we're the criminals. Yes, I am a criminal. My crime is that +of curiosity. My crime is that of judging people by what they say and +think, not what they look like. My crime is that of outsmarting you, +something that you will never forgive me for. I am a hacker, and this is +my manifesto. You may stop this individual, but you can't stop us all... +after all, we're all alike. + +This was the last published file written by The Mentor. Shortly after +releasing it, he was busted by the FBI. The Mentor, sadly missed. diff --git a/client/src/leap/soledad/client/examples/plot-async-db.py b/client/src/leap/soledad/client/examples/plot-async-db.py new file mode 100644 index 00000000..018a1a1d --- /dev/null +++ b/client/src/leap/soledad/client/examples/plot-async-db.py @@ -0,0 +1,45 @@ +import csv +from matplotlib import pyplot as plt + +FILE = "bench.csv" + +# config the plot +plt.xlabel('number of inserts') +plt.ylabel('time (seconds)') +plt.title('SQLCipher parallelization') + +kwargs = { + 'linewidth': 1.0, + 'linestyle': '-', +} + +series = (('sync', 'r'), + ('async', 'g')) + +data = {'mark': [], + 'sync': [], + 'async': []} + +with open(FILE, 'rb') as csvfile: + series_reader = csv.reader(csvfile, delimiter=',') + for m, s, a in series_reader: + data['mark'].append(int(m)) + data['sync'].append(float(s)) + data['async'].append(float(a)) + +xmax = max(data['mark']) +xmin = min(data['mark']) +ymax = max(data['sync'] + data['async']) +ymin = min(data['sync'] + data['async']) + +for run in series: + name = run[0] + color = run[1] + plt.plot(data['mark'], data[name], label=name, color=color, **kwargs) + +plt.axes().annotate("", xy=(xmax, ymax)) +plt.axes().annotate("", xy=(xmin, ymin)) + +plt.grid() +plt.legend() +plt.show() diff --git a/client/src/leap/soledad/client/examples/run_benchmark.py b/client/src/leap/soledad/client/examples/run_benchmark.py new file mode 100644 index 00000000..a112cf45 --- /dev/null +++ b/client/src/leap/soledad/client/examples/run_benchmark.py @@ -0,0 +1,28 @@ +""" +Run a mini-benchmark between regular api and dbapi +""" +import commands +import os +import time + +TMPDIR = os.environ.get("TMPDIR", "/tmp") +CSVFILE = 'bench.csv' + +cmd = "SILENT=1 TIMES={times} TMPDIR={tmpdir} python ./use_{version}api.py" + +parse_time = lambda r: r.split('\n')[-1] + + +with open(CSVFILE, 'w') as log: + + for times in range(0, 10000, 500): + cmd1 = cmd.format(times=times, tmpdir=TMPDIR, version="") + sync_time = parse_time(commands.getoutput(cmd1)) + + cmd2 = cmd.format(times=times, tmpdir=TMPDIR, version="adb") + async_time = parse_time(commands.getoutput(cmd2)) + + print times, sync_time, async_time + log.write("%s, %s, %s\n" % (times, sync_time, async_time)) + log.flush() + time.sleep(2) diff --git a/client/src/leap/soledad/client/examples/use_adbapi.py b/client/src/leap/soledad/client/examples/use_adbapi.py new file mode 100644 index 00000000..d3ee8527 --- /dev/null +++ b/client/src/leap/soledad/client/examples/use_adbapi.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# use_adbapi.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Example of use of the asynchronous soledad api. +""" +from __future__ import print_function +import datetime +import os + +import u1db +from twisted.internet import defer, reactor + +from leap.soledad.client import adbapi +from leap.soledad.client.sqlcipher import SQLCipherOptions + + +folder = os.environ.get("TMPDIR", "tmp") +times = int(os.environ.get("TIMES", "1000")) +silent = os.environ.get("SILENT", False) + +tmpdb = os.path.join(folder, "test.soledad") + + +def debug(*args): + if not silent: + print(*args) + +debug("[+] db path:", tmpdb) +debug("[+] times", times) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +dbpool = adbapi.getConnectionPool(opts) + + +def createDoc(doc): + return dbpool.runU1DBQuery("create_doc", doc) + + +def getAllDocs(): + return dbpool.runU1DBQuery("get_all_docs") + + +def countDocs(_): + debug("counting docs...") + d = getAllDocs() + d.addCallbacks(printResult, lambda e: e.printTraceback()) + d.addBoth(allDone) + + +def printResult(r): + if isinstance(r, u1db.Document): + debug(r.doc_id, r.content['number']) + else: + len_results = len(r[1]) + debug("GOT %s results" % len(r[1])) + + if len_results == times: + debug("ALL GOOD") + else: + raise ValueError("We didn't expect this result len") + + +def allDone(_): + debug("ALL DONE!") + if silent: + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + reactor.stop() + +deferreds = [] + +for i in range(times): + doc = {"number": i, + "payload": open('manifest.phk').read()} + d = createDoc(doc) + d.addCallbacks(printResult, lambda e: e.printTraceback()) + deferreds.append(d) + + +all_done = defer.gatherResults(deferreds, consumeErrors=True) +all_done.addCallback(countDocs) + +reactor.run() diff --git a/client/src/leap/soledad/client/examples/use_api.py b/client/src/leap/soledad/client/examples/use_api.py new file mode 100644 index 00000000..fd0a100c --- /dev/null +++ b/client/src/leap/soledad/client/examples/use_api.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# use_api.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Example of use of the soledad api. +""" +from __future__ import print_function +import datetime +import os + +from leap.soledad.client import sqlcipher +from leap.soledad.client.sqlcipher import SQLCipherOptions + + +folder = os.environ.get("TMPDIR", "tmp") +times = int(os.environ.get("TIMES", "1000")) +silent = os.environ.get("SILENT", False) + +tmpdb = os.path.join(folder, "test.soledad") + + +def debug(*args): + if not silent: + print(*args) + +debug("[+] db path:", tmpdb) +debug("[+] times", times) + +if os.path.isfile(tmpdb): + debug("[+] Removing existing db file...") + os.remove(tmpdb) + +start_time = datetime.datetime.now() + +opts = SQLCipherOptions(tmpdb, "secret", create=True) +db = sqlcipher.SQLCipherDatabase(None, opts) + + +def allDone(): + debug("ALL DONE!") + + +for i in range(times): + doc = {"number": i, + "payload": open('manifest.phk').read()} + d = db.create_doc(doc) + debug(d.doc_id, d.content['number']) + +debug("Count", len(db.get_all_docs()[1])) +if silent: + end_time = datetime.datetime.now() + print((end_time - start_time).total_seconds()) + +allDone() diff --git a/client/src/leap/soledad/client/mp_safe_db_TOREMOVE.py b/client/src/leap/soledad/client/mp_safe_db_TOREMOVE.py deleted file mode 100644 index 9ed0bef4..00000000 --- a/client/src/leap/soledad/client/mp_safe_db_TOREMOVE.py +++ /dev/null @@ -1,112 +0,0 @@ -# -*- coding: utf-8 -*- -# mp_safe_db.py -# Copyright (C) 2014 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -Multiprocessing-safe SQLite database. -""" - - -from threading import Thread -from Queue import Queue -from pysqlcipher import dbapi2 - - -# Thanks to http://code.activestate.com/recipes/526618/ - -class MPSafeSQLiteDB(Thread): - """ - A multiprocessing-safe SQLite database accessor. - """ - - CLOSE = "--close--" - NO_MORE = "--no more--" - - def __init__(self, db_path): - """ - Initialize the process - """ - Thread.__init__(self) - self._db_path = db_path - self._requests = Queue() - self.start() - - def run(self): - """ - Run the multiprocessing-safe database accessor. - """ - conn = dbapi2.connect(self._db_path) - while True: - req, arg, res = self._requests.get() - if req == self.CLOSE: - break - with conn: - cursor = conn.cursor() - cursor.execute(req, arg) - if res: - for rec in cursor.fetchall(): - res.put(rec) - res.put(self.NO_MORE) - conn.close() - - def execute(self, req, arg=None, res=None): - """ - Execute a request on the database. - - :param req: The request to be executed. - :type req: str - :param arg: The arguments for the request. - :type arg: tuple - :param res: A queue to write request results. - :type res: multiprocessing.Queue - """ - self._requests.put((req, arg or tuple(), res)) - - def select(self, req, arg=None): - """ - Run a select query on the database and yield results. - - :param req: The request to be executed. - :type req: str - :param arg: The arguments for the request. - :type arg: tuple - """ - res = Queue() - self.execute(req, arg, res) - while True: - rec = res.get() - if rec == self.NO_MORE: - break - yield rec - - def close(self): - """ - Close the database connection. - """ - self.execute(self.CLOSE) - self.join() - - def cursor(self): - """ - Return a fake cursor object. - - Not really a cursor, but allows for calling db.cursor().execute(). - - :return: Self. - :rtype: MPSafeSQLiteDatabase - """ - return self diff --git a/client/src/leap/soledad/client/pragmas.py b/client/src/leap/soledad/client/pragmas.py index a21e68a8..7a13a694 100644 --- a/client/src/leap/soledad/client/pragmas.py +++ b/client/src/leap/soledad/client/pragmas.py @@ -15,18 +15,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ -Different pragmas used in the SQLCIPHER database. +Different pragmas used in the initialization of the SQLCipher database. """ -# TODO --------------------------------------------------------------- -# Work In Progress. -# We need to reduce the impedance mismatch between the current soledad -# implementation and the eventually asynchronous api. -# So... how to plug it in, allowing for an optional sync / async coexistence? -# One of the first things is to isolate all the pragmas work that has to be -# done during initialization. -# And, instead of having all of them passed the db_handle and executing that, -# we could have just a string returned, that can be chained to a deferred. -# --------------------------------------------------------------------- import logging import string @@ -81,7 +71,7 @@ def _set_key(db_handle, key, is_raw_key): _set_key_passphrase(db_handle, key) -def _set_key_passphrase(cls, db_handle, passphrase): +def _set_key_passphrase(db_handle, passphrase): """ Set a passphrase for encryption key derivation. @@ -265,7 +255,7 @@ def _set_rekey_passphrase(db_handle, passphrase): db_handle.cursor().execute("PRAGMA rekey = '%s'" % passphrase) -def _set_rekey_raw(cls, db_handle, key): +def _set_rekey_raw(db_handle, key): """ Change the raw hexadecimal encryption key. @@ -300,7 +290,7 @@ def set_synchronous_normal(db_handle): db_handle.cursor().execute('PRAGMA synchronous=NORMAL') -def set_mem_temp_store(cls, db_handle): +def set_mem_temp_store(db_handle): """ Use a in-memory store for temporary tables. """ @@ -308,7 +298,7 @@ def set_mem_temp_store(cls, db_handle): db_handle.cursor().execute('PRAGMA temp_store=MEMORY') -def set_write_ahead_logging(cls, db_handle): +def set_write_ahead_logging(db_handle): """ Enable write-ahead logging, and set the autocheckpoint to 50 pages. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index fcef592d..c9e69c73 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -45,7 +45,7 @@ import logging import multiprocessing import os import threading -import time +# import time --- needed for the win initialization hack import json from hashlib import sha256 @@ -58,11 +58,13 @@ from u1db.backends import sqlite_backend from u1db import errors as u1db_errors from taskthread import TimerTask -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client import crypto from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer -from leap.soledad.client.mp_safe_db import MPSafeSQLiteDB + +# TODO use adbapi too +from leap.soledad.client.mp_safe_db_TOREMOVE import MPSafeSQLiteDB from leap.soledad.client import pragmas from leap.soledad.common import soledad_assert from leap.soledad.common.document import SoledadDocument @@ -80,36 +82,81 @@ sqlite_backend.dbapi2 = sqlcipher_dbapi2 # See https://sqlite.org/threadsafe.html # and http://bugs.python.org/issue16509 -SQLITE_CHECK_SAME_THREAD = False +# TODO this no longer needed ------------- +#SQLITE_CHECK_SAME_THREAD = False + + +def initialize_sqlcipher_db(opts, on_init=None): + """ + Initialize a SQLCipher database. + + :param opts: + :type opts: SQLCipherOptions + :param on_init: a tuple of queries to be executed on initialization + :type on_init: tuple + :return: a SQLCipher connection + """ + conn = sqlcipher_dbapi2.connect( + opts.path) + + # XXX not needed -- check + #check_same_thread=SQLITE_CHECK_SAME_THREAD) + + set_init_pragmas(conn, opts, extra_queries=on_init) + return conn + +_db_init_lock = threading.Lock() + + +def set_init_pragmas(conn, opts=None, extra_queries=None): + """ + Set the initialization pragmas. + + This includes the crypto pragmas, and any other options that must + be passed early to sqlcipher db. + """ + assert opts is not None + extra_queries = [] if extra_queries is None else extra_queries + with _db_init_lock: + # only one execution path should initialize the db + _set_init_pragmas(conn, opts, extra_queries) + + +def _set_init_pragmas(conn, opts, extra_queries): -# We set isolation_level to None to setup autocommit mode. -# See: http://docs.python.org/2/library/sqlite3.html#controlling-transactions -# This avoids problems with sequential operations using the same soledad object -# trying to open new transactions -# (The error was: -# OperationalError:cannot start a transaction within a transaction.) -SQLITE_ISOLATION_LEVEL = None + sync_off = os.environ.get('LEAP_SQLITE_NOSYNC') + memstore = os.environ.get('LEAP_SQLITE_MEMSTORE') + nowal = os.environ.get('LEAP_SQLITE_NOWAL') + + pragmas.set_crypto_pragmas(conn, opts) + + if not nowal: + pragmas.set_write_ahead_logging(conn) + if sync_off: + pragmas.set_synchronous_off(conn) + else: + pragmas.set_synchronous_normal(conn) + if memstore: + pragmas.set_mem_temp_store(conn) + + for query in extra_queries: + conn.cursor().execute(query) -# TODO accept cyrpto object too.... or pass it along.. class SQLCipherOptions(object): + """ + A container with options for the initialization of an SQLCipher database. + """ def __init__(self, path, key, create=True, is_raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024, - document_factory=None, defer_encryption=False, sync_db_key=None): """ - Options for the initialization of an SQLCipher database. - :param path: The filesystem path for the database to open. :type path: str :param create: True/False, should the database be created if it doesn't already exist? :param create: bool - :param document_factory: - A function that will be called with the same parameters as - Document.__init__. - :type document_factory: callable :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt document contents when syncing. :type crypto: soledad.crypto.SoledadCrypto @@ -137,87 +184,22 @@ class SQLCipherOptions(object): self.cipher_page_size = cipher_page_size self.defer_encryption = defer_encryption self.sync_db_key = sync_db_key - self.document_factory = None - - -# XXX Use SQLCIpherOptions instead -#def open(path, password, create=True, document_factory=None, crypto=None, - #raw_key=False, cipher='aes-256-cbc', kdf_iter=4000, - #cipher_page_size=1024, defer_encryption=False, sync_db_key=None): - #""" - #Open a database at the given location. -# - #*** IMPORTANT *** -# - #Don't forget to close the database after use by calling the close() - #method otherwise some resources might not be freed and you may experience - #several kinds of leakages. -# - #*** IMPORTANT *** -# - #Will raise u1db.errors.DatabaseDoesNotExist if create=False and the - #database does not already exist. -# - #:return: An instance of Database. - #:rtype SQLCipherDatabase - #""" - #args = (path, password) - #kwargs = { - #'create': create, - #'document_factory': document_factory, - #'crypto': crypto, - #'raw_key': raw_key, - #'cipher': cipher, - #'kdf_iter': kdf_iter, - #'cipher_page_size': cipher_page_size, - #'defer_encryption': defer_encryption, - #'sync_db_key': sync_db_key} - # XXX pass only a CryptoOptions object around - #return SQLCipherDatabase.open_database(*args, **kwargs) - # # The SQLCipher database # + class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ A U1DB implementation that uses SQLCipher as its persistence layer. """ defer_encryption = False - _index_storage_value = 'expand referenced encrypted' - k_lock = threading.Lock() - create_doc_lock = threading.Lock() - update_indexes_lock = threading.Lock() - _sync_watcher = None - _sync_enc_pool = None - - """ - The name of the local symmetrically encrypted documents to - sync database file. - """ - LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' - - """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - encrypting_lock = threading.Lock() - - """ - Period or recurrence of the periodic encrypting task, in seconds. - """ - ENCRYPT_TASK_PERIOD = 1 + # XXX not used afaik: + # _index_storage_value = 'expand referenced encrypted' - syncing_lock = defaultdict(threading.Lock) - """ - A dictionary that hold locks which avoid multiple sync attempts from the - same database replica. - """ - - # XXX Use SQLCIpherOptions instead - def __init__(self, opts): + def __init__(self, soledad_crypto, opts): """ Connect to an existing SQLCipher database, creating a new sqlcipher database file if needed. @@ -230,76 +212,23 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): *** IMPORTANT *** + :param soledad_crypto: + :type soldead_crypto: :param opts: :type opts: SQLCipherOptions """ + # TODO ------ we don't need any soledad crypto in here + # ensure the db is encrypted if the file already exists - if os.path.exists(opts.sqlcipher_file): + if os.path.isfile(opts.path): self.assert_db_is_encrypted(opts) # connect to the sqlcipher database - # XXX this lock should not be needed ----------------- - # u1db holds a mutex over sqlite internally for the initialization. - with self.k_lock: - self._db_handle = sqlcipher_dbapi2.connect( - - # TODO ----------------------------------------------- - # move the init to a single function - opts.sqlcipher_file, - isolation_level=SQLITE_ISOLATION_LEVEL, - check_same_thread=SQLITE_CHECK_SAME_THREAD) - # set SQLCipher cryptographic parameters - - # XXX allow optional deferredChain here ? - pragmas.set_crypto_pragmas( - self._db_handle, password, raw_key, cipher, kdf_iter, - cipher_page_size) - if os.environ.get('LEAP_SQLITE_NOSYNC'): - pragmas.set_synchronous_off(self._db_handle) - else: - pragmas.set_synchronous_normal(self._db_handle) - if os.environ.get('LEAP_SQLITE_MEMSTORE'): - pragmas.set_mem_temp_store(self._db_handle) - pragmas.set_write_ahead_logging(self._db_handle) - - self._real_replica_uid = None - self._ensure_schema() - self._crypto = opts.crypto - - - # TODO ------------------------------------------------ - # Move syncdb to another class ------------------------ - # define sync-db attrs - self._sqlcipher_file = sqlcipher_file - self._sync_db_key = sync_db_key - self._sync_db = None - self._sync_db_write_lock = None - self._sync_enc_pool = None - self.sync_queue = None + self._db_handle = initialize_sqlcipher_db(opts) + self._real_replica_uid = None + self._ensure_schema() - if self.defer_encryption: - # initialize sync db - self._init_sync_db() - # initialize syncing queue encryption pool - self._sync_enc_pool = SyncEncrypterPool( - self._crypto, self._sync_db, self._sync_db_write_lock) - self._sync_watcher = TimerTask(self._encrypt_syncing_docs, - self.ENCRYPT_TASK_PERIOD) - self._sync_watcher.start() - - def factory(doc_id=None, rev=None, json='{}', has_conflicts=False, - syncable=True): - return SoledadDocument(doc_id=doc_id, rev=rev, json=json, - has_conflicts=has_conflicts, - syncable=syncable) - self.set_document_factory(factory) - # we store syncers in a dictionary indexed by the target URL. We also - # store a hash of the auth info in case auth info expires and we need - # to rebuild the syncer for that target. The final self._syncers - # format is the following: - # - # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} - self._syncers = {} + self.set_document_factory(soledad_doc_factory) def _extra_schema_init(self, c): """ @@ -312,40 +241,212 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :param c: The cursor for querying the database. :type c: dbapi2.cursor """ + print "CALLING EXTRA SCHEMA INIT...." c.execute( 'ALTER TABLE document ' 'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') + # + # Document operations + # + + def put_doc(self, doc): + """ + Overwrite the put_doc method, to enqueue the modified document for + encryption before sync. + + :param doc: The document to be put. + :type doc: u1db.Document + + :return: The new document revision. + :rtype: str + """ + doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) + + # XXX move to API + if self.defer_encryption: + self.sync_queue.put_nowait(doc) + return doc_rev + + # + # SQLCipher API methods + # + + # TODO this doesn't need to be an instance method + def assert_db_is_encrypted(self, opts): + """ + Assert that the sqlcipher file contains an encrypted database. + + When opening an existing database, PRAGMA key will not immediately + throw an error if the key provided is incorrect. To test that the + database can be successfully opened with the provided key, it is + necessary to perform some operation on the database (i.e. read from + it) and confirm it is success. + + The easiest way to do this is select off the sqlite_master table, + which will attempt to read the first page of the database and will + parse the schema. + + :param opts: + """ + # We try to open an encrypted database with the regular u1db + # backend should raise a DatabaseError exception. + # If the regular backend succeeds, then we need to stop because + # the database was not properly initialized. + try: + sqlite_backend.SQLitePartialExpandDatabase(opts.path) + except sqlcipher_dbapi2.DatabaseError: + # assert that we can access it using SQLCipher with the given + # key + dummy_query = ('SELECT count(*) FROM sqlite_master',) + initialize_sqlcipher_db(opts, on_init=dummy_query) + else: + raise DatabaseIsNotEncrypted() + + # Extra query methods: extensions to the base u1db sqlite implmentation. + + def get_count_from_index(self, index_name, *key_values): + """ + Return the count for a given combination of index_name + and key values. + + Extension method made from similar methods in u1db version 13.09 + + :param index_name: The index to query + :type index_name: str + :param key_values: values to match. eg, if you have + an index with 3 fields then you would have: + get_from_index(index_name, val1, val2, val3) + :type key_values: tuple + :return: count. + :rtype: int + """ + c = self._db_handle.cursor() + definition = self._get_index_definition(index_name) + + if len(key_values) != len(definition): + raise u1db_errors.InvalidValueForIndex() + tables = ["document_fields d%d" % i for i in range(len(definition))] + novalue_where = ["d.doc_id = d%d.doc_id" + " AND d%d.field_name = ?" + % (i, i) for i in range(len(definition))] + exact_where = [novalue_where[i] + + (" AND d%d.value = ?" % (i,)) + for i in range(len(definition))] + args = [] + where = [] + for idx, (field, value) in enumerate(zip(definition, key_values)): + args.append(field) + where.append(exact_where[idx]) + args.append(value) + + tables = ["document_fields d%d" % i for i in range(len(definition))] + statement = ( + "SELECT COUNT(*) FROM document d, %s WHERE %s " % ( + ', '.join(tables), + ' AND '.join(where), + )) + try: + c.execute(statement, tuple(args)) + except sqlcipher_dbapi2.OperationalError, e: + raise sqlcipher_dbapi2.OperationalError( + str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args)) + res = c.fetchall() + return res[0][0] + + def close(self): + """ + Close db connections. + """ + # TODO should be handled by adbapi instead + # TODO syncdb should be stopped first + + if logger is not None: # logger might be none if called from __del__ + logger.debug("SQLCipher backend: closing") + + # close the actual database + if self._db_handle is not None: + self._db_handle.close() + self._db_handle = None + + # indexes + + def _put_and_update_indexes(self, old_doc, doc): + """ + Update a document and all indexes related to it. + + :param old_doc: The old version of the document. + :type old_doc: u1db.Document + :param doc: The new version of the document. + :type doc: u1db.Document + """ + sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes( + self, old_doc, doc) + c = self._db_handle.cursor() + c.execute('UPDATE document SET syncable=? WHERE doc_id=?', + (doc.syncable, doc.doc_id)) + + def _get_doc(self, doc_id, check_for_conflicts=False): + """ + Get just the document content, without fancy handling. + + :param doc_id: The unique document identifier + :type doc_id: str + :param include_deleted: If set to True, deleted documents will be + returned with empty content. Otherwise asking for a deleted + document will return None. + :type include_deleted: bool + + :return: a Document object. + :type: u1db.Document + """ + doc = sqlite_backend.SQLitePartialExpandDatabase._get_doc( + self, doc_id, check_for_conflicts) + if doc: + c = self._db_handle.cursor() + c.execute('SELECT syncable FROM document WHERE doc_id=?', + (doc.doc_id,)) + result = c.fetchone() + doc.syncable = bool(result[0]) + return doc + + def __del__(self): + """ + Free resources when deleting or garbage collecting the database. + + This is only here to minimze problems if someone ever forgets to call + the close() method after using the database; you should not rely on + garbage collecting to free up the database resources. + """ + self.close() # TODO ---- rescue the fix for the windows case from here... - #@classmethod - # XXX Use SQLCIpherOptions instead - #def _open_database(cls, sqlcipher_file, password, document_factory=None, - #crypto=None, raw_key=False, cipher='aes-256-cbc', - #kdf_iter=4000, cipher_page_size=1024, - #defer_encryption=False, sync_db_key=None): - #""" - #Open a SQLCipher database. + # @classmethod + # def _open_database(cls, sqlcipher_file, password, document_factory=None, + # crypto=None, raw_key=False, cipher='aes-256-cbc', + # kdf_iter=4000, cipher_page_size=1024, + # defer_encryption=False, sync_db_key=None): + # """ + # Open a SQLCipher database. # - #:return: The database object. - #:rtype: SQLCipherDatabase - #""" - #cls.defer_encryption = defer_encryption - #if not os.path.isfile(sqlcipher_file): - #raise u1db_errors.DatabaseDoesNotExist() + # :return: The database object. + # :rtype: SQLCipherDatabase + # """ + # cls.defer_encryption = defer_encryption + # if not os.path.isfile(sqlcipher_file): + # raise u1db_errors.DatabaseDoesNotExist() # - #tries = 2 + # tries = 2 # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6) # where without re-opening the database on Windows, it # doesn't see the transaction that was just committed - #while True: -# - #with cls.k_lock: - #db_handle = dbapi2.connect( - #sqlcipher_file, - #check_same_thread=SQLITE_CHECK_SAME_THREAD) + # while True: + # with cls.k_lock: + # db_handle = dbapi2.connect( + # sqlcipher_file, + # check_same_thread=SQLITE_CHECK_SAME_THREAD) # - #try: + # try: # set cryptographic params # # XXX pass only a CryptoOptions object around @@ -374,49 +475,108 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): #crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, #cipher_page_size=cipher_page_size, sync_db_key=sync_db_key) - #@classmethod - #def open_database(cls, sqlcipher_file, password, create, - #document_factory=None, crypto=None, raw_key=False, - #cipher='aes-256-cbc', kdf_iter=4000, - #cipher_page_size=1024, defer_encryption=False, - #sync_db_key=None): - # XXX pass only a CryptoOptions object around - #""" - #Open a SQLCipher database. -# - #*** IMPORTANT *** -# - #Don't forget to close the database after use by calling the close() - #method otherwise some resources might not be freed and you may - #experience several kinds of leakages. -# - #*** IMPORTANT *** -# - #:return: The database object. - #:rtype: SQLCipherDatabase - #""" - #cls.defer_encryption = defer_encryption - #args = sqlcipher_file, password - #kwargs = { - #'crypto': crypto, - #'raw_key': raw_key, - #'cipher': cipher, - #'kdf_iter': kdf_iter, - #'cipher_page_size': cipher_page_size, - #'defer_encryption': defer_encryption, - #'sync_db_key': sync_db_key, - #'document_factory': document_factory, - #} - #try: - #return cls._open_database(*args, **kwargs) - #except u1db_errors.DatabaseDoesNotExist: - #if not create: - #raise -# - # XXX here we were missing sync_db_key, intentional? - #return SQLCipherDatabase(*args, **kwargs) - # BEGIN SYNC FOO ---------------------------------------------------------- +class SQLCipherU1DBSync(object): + + _sync_watcher = None + _sync_enc_pool = None + + """ + The name of the local symmetrically encrypted documents to + sync database file. + """ + LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db' + + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ + # XXX We do not need the lock here now. Remove. + encrypting_lock = threading.Lock() + + """ + Period or recurrence of the periodic encrypting task, in seconds. + """ + # XXX use LoopingCall. + # Just use fucking deferreds, do not waste time looping. + ENCRYPT_TASK_PERIOD = 1 + + """ + A dictionary that hold locks which avoid multiple sync attempts from the + same database replica. + """ + syncing_lock = defaultdict(threading.Lock) + + def _init_sync(self, opts, soledad_crypto, defer_encryption=False): + + self._crypto = soledad_crypto + + # TODO ----- have to decide what to do with syncer + self._sync_db_key = opts.sync_db_key + self._sync_db = None + self._sync_db_write_lock = None + self._sync_enc_pool = None + self.sync_queue = None + + if self.defer_encryption: + # initialize sync db + self._init_sync_db() + # initialize syncing queue encryption pool + self._sync_enc_pool = crypto.SyncEncrypterPool( + self._crypto, self._sync_db, self._sync_db_write_lock) + self._sync_watcher = TimerTask(self._encrypt_syncing_docs, + self.ENCRYPT_TASK_PERIOD) + self._sync_watcher.start() + + # TODO move to class attribute? + # we store syncers in a dictionary indexed by the target URL. We also + # store a hash of the auth info in case auth info expires and we need + # to rebuild the syncer for that target. The final self._syncers + # format is the following:: + # + # self._syncers = {'<url>': ('<auth_hash>', syncer), ...} + self._syncers = {} + self._sync_db_write_lock = threading.Lock() + self.sync_queue = multiprocessing.Queue() + + def _init_sync_db(self, opts): + """ + Initialize the Symmetrically-Encrypted document to be synced database, + and the queue to communicate with subprocess workers. + + :param opts: + :type opts: SQLCipherOptions + """ + soledad_assert(opts.sync_db_key is not None) + sync_db_path = None + if opts.path != ":memory:": + sync_db_path = "%s-sync" % opts.path + else: + sync_db_path = ":memory:" + + # XXX use initialize_sqlcipher_db here too + # TODO pass on_init queries to initialize_sqlcipher_db + self._sync_db = MPSafeSQLiteDB(sync_db_path) + pragmas.set_crypto_pragmas(self._sync_db, opts) + + # create sync tables + self._create_sync_db_tables() + + def _create_sync_db_tables(self): + """ + Create tables for the local sync documents db if needed. + """ + # TODO use adbapi --------------------------------- + encr = crypto.SyncEncrypterPool + decr = crypto.SyncDecrypterPool + sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + encr.TABLE_NAME, encr.FIELD_NAMES)) + sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( + decr.TABLE_NAME, decr.FIELD_NAMES)) + + with self._sync_db_write_lock: + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) def sync(self, url, creds=None, autocreate=True, defer_decryption=True): """ @@ -428,14 +588,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :param url: The url of the target replica to sync with. :type url: str - :param creds: optional dictionary giving credentials. + :param creds: + optional dictionary giving credentials. to authorize the operation with the server. :type creds: dict :param autocreate: Ask the target to create the db if non-existent. :type autocreate: bool - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. + :param defer_decryption: + Whether to defer the decryption process using the intermediate + database. If False, decryption will be done inline. :type defer_decryption: bool :return: The local generation before the synchronisation was performed. @@ -482,13 +643,13 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): Because of that, this method blocks until the syncing lock can be acquired. """ - with SQLCipherDatabase.syncing_lock[self._get_replica_uid()]: + with self.syncing_lock[self._get_replica_uid()]: syncer = self._get_syncer(url, creds=creds) yield syncer @property def syncing(self): - lock = SQLCipherDatabase.syncing_lock[self._get_replica_uid()] + lock = self.syncing_lock[self._get_replica_uid()] acquired_lock = lock.acquire(False) if acquired_lock is False: return True @@ -530,46 +691,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): syncer.num_inserted = 0 return syncer - # END SYNC FOO ---------------------------------------------------------- - - def _init_sync_db(self): - """ - Initialize the Symmetrically-Encrypted document to be synced database, - and the queue to communicate with subprocess workers. - """ - if self._sync_db is None: - soledad_assert(self._sync_db_key is not None) - sync_db_path = None - if self._sqlcipher_file != ":memory:": - sync_db_path = "%s-sync" % self._sqlcipher_file - else: - sync_db_path = ":memory:" - self._sync_db = MPSafeSQLiteDB(sync_db_path) - # protect the sync db with a password - if self._sync_db_key is not None: - # XXX pass only a CryptoOptions object around - pragmas.set_crypto_pragmas( - self._sync_db, self._sync_db_key, False, - 'aes-256-cbc', 4000, 1024) - self._sync_db_write_lock = threading.Lock() - self._create_sync_db_tables() - self.sync_queue = multiprocessing.Queue() - - def _create_sync_db_tables(self): - """ - Create tables for the local sync documents db if needed. - """ - encr = SyncEncrypterPool - decr = SyncDecrypterPool - sql_encr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( - encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr = ("CREATE TABLE IF NOT EXISTS %s (%s)" % ( - decr.TABLE_NAME, decr.FIELD_NAMES)) - - with self._sync_db_write_lock: - self._sync_db.execute(sql_encr) - self._sync_db.execute(sql_decr) - # # Symmetric encryption of syncing docs # @@ -599,182 +720,14 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): finally: lock.release() - # - # Document operations - # - - def put_doc(self, doc): - """ - Overwrite the put_doc method, to enqueue the modified document for - encryption before sync. - - :param doc: The document to be put. - :type doc: u1db.Document - - :return: The new document revision. - :rtype: str - """ - doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc) - if self.defer_encryption: - self.sync_queue.put_nowait(doc) - return doc_rev - - # indexes - - def _put_and_update_indexes(self, old_doc, doc): - """ - Update a document and all indexes related to it. - - :param old_doc: The old version of the document. - :type old_doc: u1db.Document - :param doc: The new version of the document. - :type doc: u1db.Document - """ - with self.update_indexes_lock: - sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes( - self, old_doc, doc) - c = self._db_handle.cursor() - c.execute('UPDATE document SET syncable=? ' - 'WHERE doc_id=?', - (doc.syncable, doc.doc_id)) - - def _get_doc(self, doc_id, check_for_conflicts=False): - """ - Get just the document content, without fancy handling. - - :param doc_id: The unique document identifier - :type doc_id: str - :param include_deleted: If set to True, deleted documents will be - returned with empty content. Otherwise asking for a deleted - document will return None. - :type include_deleted: bool - - :return: a Document object. - :type: u1db.Document - """ - doc = sqlite_backend.SQLitePartialExpandDatabase._get_doc( - self, doc_id, check_for_conflicts) - if doc: - c = self._db_handle.cursor() - c.execute('SELECT syncable FROM document WHERE doc_id=?', - (doc.doc_id,)) - result = c.fetchone() - doc.syncable = bool(result[0]) - return doc - - # - # SQLCipher API methods - # - - # XXX Use SQLCIpherOptions instead - @classmethod - def assert_db_is_encrypted(cls, sqlcipher_file, key, raw_key, cipher, - kdf_iter, cipher_page_size): - """ - Assert that C{sqlcipher_file} contains an encrypted database. - - When opening an existing database, PRAGMA key will not immediately - throw an error if the key provided is incorrect. To test that the - database can be successfully opened with the provided key, it is - necessary to perform some operation on the database (i.e. read from - it) and confirm it is success. - - The easiest way to do this is select off the sqlite_master table, - which will attempt to read the first page of the database and will - parse the schema. - - :param sqlcipher_file: The path for the SQLCipher file. - :type sqlcipher_file: str - :param key: The key that protects the SQLCipher db. - :type key: str - :param raw_key: Whether C{key} is a raw 64-char hex string or a - passphrase that should be hashed to obtain the encyrption key. - :type raw_key: bool - :param cipher: The cipher and mode to use. - :type cipher: str - :param kdf_iter: The number of iterations to use. - :type kdf_iter: int - :param cipher_page_size: The page size. - :type cipher_page_size: int - """ - try: - # try to open an encrypted database with the regular u1db - # backend should raise a DatabaseError exception. - sqlite_backend.SQLitePartialExpandDatabase(sqlcipher_file) - raise DatabaseIsNotEncrypted() - except sqlcipher_dbapi2.DatabaseError: - # assert that we can access it using SQLCipher with the given - # key - with cls.k_lock: - db_handle = sqlcipher_dbapi2.connect( - sqlcipher_file, - isolation_level=SQLITE_ISOLATION_LEVEL, - check_same_thread=SQLITE_CHECK_SAME_THREAD) - pragmas.set_crypto_pragmas( - db_handle, key, raw_key, cipher, - kdf_iter, cipher_page_size) - db_handle.cursor().execute( - 'SELECT count(*) FROM sqlite_master') - - # Extra query methods: extensions to the base sqlite implmentation. - - def get_count_from_index(self, index_name, *key_values): - """ - Returns the count for a given combination of index_name - and key values. - - Extension method made from similar methods in u1db version 13.09 - - :param index_name: The index to query - :type index_name: str - :param key_values: values to match. eg, if you have - an index with 3 fields then you would have: - get_from_index(index_name, val1, val2, val3) - :type key_values: tuple - :return: count. - :rtype: int - """ - c = self._db_handle.cursor() - definition = self._get_index_definition(index_name) - - if len(key_values) != len(definition): - raise u1db_errors.InvalidValueForIndex() - tables = ["document_fields d%d" % i for i in range(len(definition))] - novalue_where = ["d.doc_id = d%d.doc_id" - " AND d%d.field_name = ?" - % (i, i) for i in range(len(definition))] - exact_where = [novalue_where[i] - + (" AND d%d.value = ?" % (i,)) - for i in range(len(definition))] - args = [] - where = [] - for idx, (field, value) in enumerate(zip(definition, key_values)): - args.append(field) - where.append(exact_where[idx]) - args.append(value) - - tables = ["document_fields d%d" % i for i in range(len(definition))] - statement = ( - "SELECT COUNT(*) FROM document d, %s WHERE %s " % ( - ', '.join(tables), - ' AND '.join(where), - )) - try: - c.execute(statement, tuple(args)) - except sqlcipher_dbapi2.OperationalError, e: - raise sqlcipher_dbapi2.OperationalError( - str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args)) - res = c.fetchall() - return res[0][0] + @property + def replica_uid(self): + return self._get_replica_uid() def close(self): """ - Close db_handle and close syncer. + Close the syncer and syncdb orderly """ - # TODO separate db from syncers -------------- - - if logger is not None: # logger might be none if called from __del__ - logger.debug("Sqlcipher backend: closing") # stop the sync watcher for deferred encryption if self._sync_watcher is not None: self._sync_watcher.stop() @@ -789,12 +742,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): if self._sync_enc_pool is not None: self._sync_enc_pool.close() self._sync_enc_pool = None - # close the actual database - if self._db_handle is not None: - self._db_handle.close() - self._db_handle = None - # --------------------------------------- # close the sync database if self._sync_db is not None: self._sync_db.close() @@ -805,20 +753,6 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): del self.sync_queue self.sync_queue = None - def __del__(self): - """ - Free resources when deleting or garbage collecting the database. - - This is only here to minimze problems if someone ever forgets to call - the close() method after using the database; you should not rely on - garbage collecting to free up the database resources. - """ - self.close() - - @property - def replica_uid(self): - return self._get_replica_uid() - # # Exceptions # @@ -831,4 +765,13 @@ class DatabaseIsNotEncrypted(Exception): pass +def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False, + syncable=True): + """ + Return a default Soledad Document. + Used in the initialization for SQLCipherDatabase + """ + return SoledadDocument(doc_id=doc_id, rev=rev, json=json, + has_conflicts=has_conflicts, syncable=syncable) + sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) |