diff options
author | Tomás Touceda <chiiph@leap.se> | 2013-08-02 10:38:17 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2013-08-02 10:38:17 -0300 |
commit | ccb79e033014eb86cfe3cfcfef9fe8ef857836ab (patch) | |
tree | 2282262f18e15ae00a939bb35e300ed128fb311b | |
parent | 20ca5b1a9f4fa6cc03ce4658c83cdf97f8141ba4 (diff) | |
parent | 4724b2d7e66ae3fd23405b3415f7ad11af8384d7 (diff) |
Merge remote-tracking branch 'kali/feature/thread_safe-r2' into develop
-rw-r--r-- | changes/feature_2662_thread_safe | 1 | ||||
-rw-r--r-- | soledad/src/leap/soledad/__init__.py | 102 | ||||
-rw-r--r-- | soledad/src/leap/soledad/dbwrapper.py | 183 |
3 files changed, 231 insertions, 55 deletions
diff --git a/changes/feature_2662_thread_safe b/changes/feature_2662_thread_safe new file mode 100644 index 00000000..f32ade59 --- /dev/null +++ b/changes/feature_2662_thread_safe @@ -0,0 +1 @@ + o Thread safe wrapper for pysqlcipher diff --git a/soledad/src/leap/soledad/__init__.py b/soledad/src/leap/soledad/__init__.py index 956f47a7..00ac21f8 100644 --- a/soledad/src/leap/soledad/__init__.py +++ b/soledad/src/leap/soledad/__init__.py @@ -27,7 +27,6 @@ remote storage in the server side. """ import os -import string import binascii import logging import urlparse @@ -42,69 +41,56 @@ import errno from xdg import BaseDirectory from hashlib import sha256 from u1db.remote import http_client -from u1db.remote.ssl_match_hostname import ( # noqa - CertificateError, - match_hostname, -) +from u1db.remote.ssl_match_hostname import match_hostname # # Assert functions # -def soledad_assert(condition, message): - """ - Asserts the condition and displays the message if that's not - met. - - @param condition: condition to check - @type condition: bool - @param message: message to display if the condition isn't met - @type message: str - """ - assert condition, message - - # we want to use leap.common.check.leap_assert in case it is available, # because it also logs in a way other parts of leap can access log messages. + try: - from leap.common.check import leap_assert - soledad_assert = leap_assert -except ImportError: - pass + from leap.common.check import leap_assert as soledad_assert +except ImportError: -def soledad_assert_type(var, expectedType): - """ - Helper assert check for a variable's expected type + def soledad_assert(condition, message): + """ + Asserts the condition and displays the message if that's not + met. - @param var: variable to check - @type var: any - @param expectedType: type to check agains - @type expectedType: type - """ - soledad_assert(isinstance(var, expectedType), - "Expected type %r instead of %r" % - (expectedType, type(var))) + @param condition: condition to check + @type condition: bool + @param message: message to display if the condition isn't met + @type message: str + """ + assert condition, message try: - from leap.common.check import leap_assert_type - soledad_assert_type = leap_assert_type + from leap.common.check import leap_assert_type as soledad_assert_type + except ImportError: - pass + + def soledad_assert_type(var, expectedType): + """ + Helper assert check for a variable's expected type + + @param var: variable to check + @type var: any + @param expectedType: type to check agains + @type expectedType: type + """ + soledad_assert(isinstance(var, expectedType), + "Expected type %r instead of %r" % + (expectedType, type(var))) # # Signaling function # -# we define a fake signaling function and fake signal constants that will -# allow for logging signaling attempts in case leap.common.events is not -# available. - -def signal(signal, content=""): - logger.info("Would signal: %s - %s." % (str(signal), content)) - SOLEDAD_CREATING_KEYS = 'Creating keys...' SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.' SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' @@ -117,9 +103,7 @@ SOLEDAD_DONE_DATA_SYNC = 'Done data sync.' # we want to use leap.common.events to emits signals, if it is available. try: from leap.common import events - # replace fake signaling function with real one - signal = events.signal - # replace fake string signals with real signals + from leap.common.events import signal SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS @@ -130,18 +114,23 @@ try: events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC + except ImportError: - pass + # we define a fake signaling function and fake signal constants that will + # allow for logging signaling attempts in case leap.common.events is not + # available. + + def signal(signal, content=""): + logger.info("Would signal: %s - %s." % (str(signal), content)) +from leap.soledad.crypto import SoledadCrypto +from leap.soledad.dbwrapper import SQLCipherWrapper from leap.soledad.document import SoledadDocument -from leap.soledad.sqlcipher import ( - open as sqlcipher_open, - SQLCipherDatabase, -) -from leap.soledad.target import SoledadSyncTarget from leap.soledad.shared_db import SoledadSharedDatabase -from leap.soledad.crypto import SoledadCrypto +from leap.soledad.sqlcipher import open as sqlcipher_open +from leap.soledad.sqlcipher import SQLCipherDatabase +from leap.soledad.target import SoledadSyncTarget logger = logging.getLogger(name=__name__) @@ -439,7 +428,9 @@ class Soledad(object): secret[salt_start:salt_end], # the salt buflen=32, # we need a key with 256 bits (32 bytes) ) - self._db = sqlcipher_open( + + # Instantiate a thread-safe wrapper + self._db = SQLCipherWrapper( self._local_db_path, binascii.b2a_hex(key), # sqlcipher only accepts the hex version create=True, @@ -453,13 +444,14 @@ class Soledad(object): """ if hasattr(self, '_db') and isinstance( self._db, - SQLCipherDatabase): + SQLCipherWrapper): self._db.close() def __del__(self): """ Make sure local database is closed when object is destroyed. """ + # Watch out! We have no guarantees that this is properly called. self.close() # diff --git a/soledad/src/leap/soledad/dbwrapper.py b/soledad/src/leap/soledad/dbwrapper.py new file mode 100644 index 00000000..c16c4925 --- /dev/null +++ b/soledad/src/leap/soledad/dbwrapper.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# dbwrapper.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Thread-safe wrapper for sqlite/pysqlcipher. + +*TODO* +At some point we surely will want to switch to a twisted way of dealing +with this, using defers and proper callbacks. But I had this tested for +some time so postponing that refactor. +""" +import logging +import threading +import Queue +import time + +import exceptions + +from functools import partial + +from leap.soledad import sqlcipher + +logger = logging.getLogger(__name__) + + +class SQLCipherWrapper(threading.Thread): + + def __init__(self, *args, **kwargs): + """ + Initializes a wrapper that proxies method and attribute + access to an underlying SQLCipher instance. We instantiate sqlcipher + in a thread, and all method accesses communicate with it using a + Queue. + + :param *args: position arguments to pass to pysqlcipher initialization + :type args: tuple + + :param **kwargs: keyword arguments to pass to pysqlcipher + initialization + :type kwargs: dict + """ + threading.Thread.__init__(self) + self._db = None + self._wrargs = args, kwargs + + self._queue = Queue.Queue() + self._stopped = threading.Event() + + self.start() + + def _init_db(self): + """ + Initializes sqlcipher database. + + This is called on a separate thread. + """ + # instantiate u1db + args, kwargs = self._wrargs + self._db = sqlcipher.open(*args, **kwargs) + + def run(self): + """ + Main loop for the sqlcipher thread. + """ + logger.debug("SQLCipherWrapper thread started.") + logger.debug("Initializing sqlcipher") + end_mths = ("__end_thread", "_SQLCipherWrapper__end_thread") + + self._init_db() + self._lock = threading.Lock() + + ct = 0 + started = False + + while True: + if self._db is None: + if started: + break + if ct > 10: + break # XXX DEBUG + logger.debug('db not ready yet, waiting...') + time.sleep(1) + ct += 1 + + started = True + + with self._lock: + try: + mth, q, wrargs = self._queue.get() + except: + logger.error("exception getting args from queue") + + res = None + attr = getattr(self._db, mth, None) + if not attr: + if mth not in end_mths: + logger.error('method %s does not exist' % (mth,)) + res = AttributeError( + "_db instance has no attribute %s" % mth) + + elif callable(attr): + # invoke the method with the passed args + args = wrargs.get('args', []) + kwargs = wrargs.get('kwargs', {}) + try: + res = attr(*args, **kwargs) + except Exception as e: + logger.error( + "Error on proxied method %s: '%r'." % ( + attr, e)) + res = e + else: + # non-callable attribute + res = attr + logger.debug('returning proxied db call...') + q.put(res) + + if mth in end_mths: + logger.debug('ending thread') + break + + logger.debug("SQLCipherWrapper thread terminated.") + self._stopped.set() + + def close(self): + """ + Closes the sqlcipher database and finishes the thread. This method + should always be called explicitely. + """ + self.__getattr__('close')() + self.__end_thread() + + def __getattr__(self, attr): + """ + Returns _db proxied attributes. + """ + + def __proxied_mth(method, *args, **kwargs): + if not self._stopped.isSet(): + wrargs = {'args': args, 'kwargs': kwargs} + q = Queue.Queue() + self._queue.put((method, q, wrargs)) + res = q.get() + q.task_done() + + if isinstance(res, exceptions.BaseException): + # XXX should get the original bt + raise res + return res + else: + logger.warning("tried to call proxied meth " + "but stopped is set: %s" % + (method,)) + + rgetattr = object.__getattribute__ + + if attr != "_db": + proxied = partial(__proxied_mth, attr) + return proxied + + # fallback to regular behavior + return rgetattr(self, attr) + + def __del__(self): + """ + Do not trust this get called. No guarantees given. Because of a funny + dance with the refs and the way the gc works, we should be calling the + close method explicitely. + """ + self.close() |