diff options
| author | Tomás Touceda <chiiph@leap.se> | 2013-08-02 10:38:17 -0300 | 
|---|---|---|
| committer | Tomás Touceda <chiiph@leap.se> | 2013-08-02 10:38:17 -0300 | 
| commit | ccb79e033014eb86cfe3cfcfef9fe8ef857836ab (patch) | |
| tree | 2282262f18e15ae00a939bb35e300ed128fb311b | |
| parent | 20ca5b1a9f4fa6cc03ce4658c83cdf97f8141ba4 (diff) | |
| parent | 4724b2d7e66ae3fd23405b3415f7ad11af8384d7 (diff) | |
Merge remote-tracking branch 'kali/feature/thread_safe-r2' into develop
| -rw-r--r-- | changes/feature_2662_thread_safe | 1 | ||||
| -rw-r--r-- | soledad/src/leap/soledad/__init__.py | 102 | ||||
| -rw-r--r-- | soledad/src/leap/soledad/dbwrapper.py | 183 | 
3 files changed, 231 insertions, 55 deletions
| diff --git a/changes/feature_2662_thread_safe b/changes/feature_2662_thread_safe new file mode 100644 index 00000000..f32ade59 --- /dev/null +++ b/changes/feature_2662_thread_safe @@ -0,0 +1 @@ +  o Thread safe wrapper for pysqlcipher diff --git a/soledad/src/leap/soledad/__init__.py b/soledad/src/leap/soledad/__init__.py index 956f47a7..00ac21f8 100644 --- a/soledad/src/leap/soledad/__init__.py +++ b/soledad/src/leap/soledad/__init__.py @@ -27,7 +27,6 @@ remote storage in the server side.  """  import os -import string  import binascii  import logging  import urlparse @@ -42,69 +41,56 @@ import errno  from xdg import BaseDirectory  from hashlib import sha256  from u1db.remote import http_client -from u1db.remote.ssl_match_hostname import (  # noqa -    CertificateError, -    match_hostname, -) +from u1db.remote.ssl_match_hostname import match_hostname  #  # Assert functions  # -def soledad_assert(condition, message): -    """ -    Asserts the condition and displays the message if that's not -    met. - -    @param condition: condition to check -    @type condition: bool -    @param message: message to display if the condition isn't met -    @type message: str -    """ -    assert condition, message - -  # we want to use leap.common.check.leap_assert in case it is available,  # because it also logs in a way other parts of leap can access log messages. +  try: -    from leap.common.check import leap_assert -    soledad_assert = leap_assert -except ImportError: -    pass +    from leap.common.check import leap_assert as soledad_assert +except ImportError: -def soledad_assert_type(var, expectedType): -    """ -    Helper assert check for a variable's expected type +    def soledad_assert(condition, message): +        """ +        Asserts the condition and displays the message if that's not +        met. -    @param var: variable to check -    @type var: any -    @param expectedType: type to check agains -    @type expectedType: type -    """ -    soledad_assert(isinstance(var, expectedType), -                   "Expected type %r instead of %r" % -                   (expectedType, type(var))) +        @param condition: condition to check +        @type condition: bool +        @param message: message to display if the condition isn't met +        @type message: str +        """ +        assert condition, message  try: -    from leap.common.check import leap_assert_type -    soledad_assert_type = leap_assert_type +    from leap.common.check import leap_assert_type as soledad_assert_type +  except ImportError: -    pass + +    def soledad_assert_type(var, expectedType): +        """ +        Helper assert check for a variable's expected type + +        @param var: variable to check +        @type var: any +        @param expectedType: type to check agains +        @type expectedType: type +        """ +        soledad_assert(isinstance(var, expectedType), +                       "Expected type %r instead of %r" % +                       (expectedType, type(var)))  #  # Signaling function  # -# we define a fake signaling function and fake signal constants that will -# allow for logging signaling attempts in case leap.common.events is not -# available. - -def signal(signal, content=""): -    logger.info("Would signal: %s - %s." % (str(signal), content)) -  SOLEDAD_CREATING_KEYS = 'Creating keys...'  SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.'  SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...' @@ -117,9 +103,7 @@ SOLEDAD_DONE_DATA_SYNC = 'Done data sync.'  # we want to use leap.common.events to emits signals, if it is available.  try:      from leap.common import events -    # replace fake signaling function with real one -    signal = events.signal -    # replace fake string signals with real signals +    from leap.common.events import signal      SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS      SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS      SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS @@ -130,18 +114,23 @@ try:          events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS      SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC      SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC +  except ImportError: -    pass +    # we define a fake signaling function and fake signal constants that will +    # allow for logging signaling attempts in case leap.common.events is not +    # available. + +    def signal(signal, content=""): +        logger.info("Would signal: %s - %s." % (str(signal), content)) +from leap.soledad.crypto import SoledadCrypto +from leap.soledad.dbwrapper import SQLCipherWrapper  from leap.soledad.document import SoledadDocument -from leap.soledad.sqlcipher import ( -    open as sqlcipher_open, -    SQLCipherDatabase, -) -from leap.soledad.target import SoledadSyncTarget  from leap.soledad.shared_db import SoledadSharedDatabase -from leap.soledad.crypto import SoledadCrypto +from leap.soledad.sqlcipher import open as sqlcipher_open +from leap.soledad.sqlcipher import SQLCipherDatabase +from leap.soledad.target import SoledadSyncTarget  logger = logging.getLogger(name=__name__) @@ -439,7 +428,9 @@ class Soledad(object):              secret[salt_start:salt_end],  # the salt              buflen=32,  # we need a key with 256 bits (32 bytes)          ) -        self._db = sqlcipher_open( + +        # Instantiate a thread-safe wrapper +        self._db = SQLCipherWrapper(              self._local_db_path,              binascii.b2a_hex(key),  # sqlcipher only accepts the hex version              create=True, @@ -453,13 +444,14 @@ class Soledad(object):          """          if hasattr(self, '_db') and isinstance(                  self._db, -                SQLCipherDatabase): +                SQLCipherWrapper):              self._db.close()      def __del__(self):          """          Make sure local database is closed when object is destroyed.          """ +        # Watch out! We have no guarantees  that this is properly called.          self.close()      # diff --git a/soledad/src/leap/soledad/dbwrapper.py b/soledad/src/leap/soledad/dbwrapper.py new file mode 100644 index 00000000..c16c4925 --- /dev/null +++ b/soledad/src/leap/soledad/dbwrapper.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# dbwrapper.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. +""" +Thread-safe wrapper for sqlite/pysqlcipher. + +*TODO* +At some point we surely will want to switch to a twisted way of dealing +with this, using defers and proper callbacks. But I had this tested for +some time so postponing that refactor. +""" +import logging +import threading +import Queue +import time + +import exceptions + +from functools import partial + +from leap.soledad import sqlcipher + +logger = logging.getLogger(__name__) + + +class SQLCipherWrapper(threading.Thread): + +    def __init__(self, *args, **kwargs): +        """ +        Initializes a wrapper that proxies method and attribute +        access to an underlying SQLCipher instance. We instantiate sqlcipher +        in a thread, and all method accesses communicate with it using a +        Queue. + +        :param *args: position arguments to pass to pysqlcipher initialization +        :type args: tuple + +        :param **kwargs: keyword arguments to pass to pysqlcipher +                         initialization +        :type kwargs: dict +        """ +        threading.Thread.__init__(self) +        self._db = None +        self._wrargs = args, kwargs + +        self._queue = Queue.Queue() +        self._stopped = threading.Event() + +        self.start() + +    def _init_db(self): +        """ +        Initializes sqlcipher database. + +        This is called on a separate thread. +        """ +        # instantiate u1db +        args, kwargs = self._wrargs +        self._db = sqlcipher.open(*args, **kwargs) + +    def run(self): +        """ +        Main loop for the sqlcipher thread. +        """ +        logger.debug("SQLCipherWrapper thread started.") +        logger.debug("Initializing sqlcipher") +        end_mths = ("__end_thread", "_SQLCipherWrapper__end_thread") + +        self._init_db() +        self._lock = threading.Lock() + +        ct = 0 +        started = False + +        while True: +            if self._db is None: +                if started: +                    break +                if ct > 10: +                    break  # XXX DEBUG +                logger.debug('db not ready yet, waiting...') +                time.sleep(1) +                ct += 1 + +            started = True + +            with self._lock: +                try: +                    mth, q, wrargs = self._queue.get() +                except: +                    logger.error("exception getting args from queue") + +                res = None +                attr = getattr(self._db, mth, None) +                if not attr: +                    if mth not in end_mths: +                        logger.error('method %s does not exist' % (mth,)) +                        res = AttributeError( +                            "_db instance has no attribute %s" % mth) + +                elif callable(attr): +                    # invoke the method with the passed args +                    args = wrargs.get('args', []) +                    kwargs = wrargs.get('kwargs', {}) +                    try: +                        res = attr(*args, **kwargs) +                    except Exception as e: +                        logger.error( +                            "Error on proxied method %s: '%r'." % ( +                            attr, e)) +                        res = e +                else: +                    # non-callable attribute +                    res = attr +                logger.debug('returning proxied db call...') +                q.put(res) + +            if mth in end_mths: +                logger.debug('ending thread') +                break + +        logger.debug("SQLCipherWrapper thread terminated.") +        self._stopped.set() + +    def close(self): +        """ +        Closes the sqlcipher database and finishes the thread. This method +        should always be called explicitely. +        """ +        self.__getattr__('close')() +        self.__end_thread() + +    def __getattr__(self, attr): +        """ +        Returns _db proxied attributes. +        """ + +        def __proxied_mth(method, *args, **kwargs): +            if not self._stopped.isSet(): +                wrargs = {'args': args, 'kwargs': kwargs} +                q = Queue.Queue() +                self._queue.put((method, q, wrargs)) +                res = q.get() +                q.task_done() + +                if isinstance(res, exceptions.BaseException): +                    # XXX should get the original bt +                    raise res +                return res +            else: +                logger.warning("tried to call proxied meth " +                               "but stopped is set: %s" % +                               (method,)) + +        rgetattr = object.__getattribute__ + +        if attr != "_db": +            proxied = partial(__proxied_mth, attr) +            return proxied + +        # fallback to regular behavior +        return rgetattr(self, attr) + +    def __del__(self): +        """ +        Do not trust this get called. No guarantees given. Because of a funny +        dance with the refs and the way the gc works, we should be calling the +        close method explicitely. +        """ +        self.close() | 
