summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTomás Touceda <chiiph@leap.se>2013-08-02 10:38:17 -0300
committerTomás Touceda <chiiph@leap.se>2013-08-02 10:38:17 -0300
commitccb79e033014eb86cfe3cfcfef9fe8ef857836ab (patch)
tree2282262f18e15ae00a939bb35e300ed128fb311b
parent20ca5b1a9f4fa6cc03ce4658c83cdf97f8141ba4 (diff)
parent4724b2d7e66ae3fd23405b3415f7ad11af8384d7 (diff)
Merge remote-tracking branch 'kali/feature/thread_safe-r2' into develop
-rw-r--r--changes/feature_2662_thread_safe1
-rw-r--r--soledad/src/leap/soledad/__init__.py102
-rw-r--r--soledad/src/leap/soledad/dbwrapper.py183
3 files changed, 231 insertions, 55 deletions
diff --git a/changes/feature_2662_thread_safe b/changes/feature_2662_thread_safe
new file mode 100644
index 00000000..f32ade59
--- /dev/null
+++ b/changes/feature_2662_thread_safe
@@ -0,0 +1 @@
+ o Thread safe wrapper for pysqlcipher
diff --git a/soledad/src/leap/soledad/__init__.py b/soledad/src/leap/soledad/__init__.py
index 956f47a7..00ac21f8 100644
--- a/soledad/src/leap/soledad/__init__.py
+++ b/soledad/src/leap/soledad/__init__.py
@@ -27,7 +27,6 @@ remote storage in the server side.
"""
import os
-import string
import binascii
import logging
import urlparse
@@ -42,69 +41,56 @@ import errno
from xdg import BaseDirectory
from hashlib import sha256
from u1db.remote import http_client
-from u1db.remote.ssl_match_hostname import ( # noqa
- CertificateError,
- match_hostname,
-)
+from u1db.remote.ssl_match_hostname import match_hostname
#
# Assert functions
#
-def soledad_assert(condition, message):
- """
- Asserts the condition and displays the message if that's not
- met.
-
- @param condition: condition to check
- @type condition: bool
- @param message: message to display if the condition isn't met
- @type message: str
- """
- assert condition, message
-
-
# we want to use leap.common.check.leap_assert in case it is available,
# because it also logs in a way other parts of leap can access log messages.
+
try:
- from leap.common.check import leap_assert
- soledad_assert = leap_assert
-except ImportError:
- pass
+ from leap.common.check import leap_assert as soledad_assert
+except ImportError:
-def soledad_assert_type(var, expectedType):
- """
- Helper assert check for a variable's expected type
+ def soledad_assert(condition, message):
+ """
+ Asserts the condition and displays the message if that's not
+ met.
- @param var: variable to check
- @type var: any
- @param expectedType: type to check agains
- @type expectedType: type
- """
- soledad_assert(isinstance(var, expectedType),
- "Expected type %r instead of %r" %
- (expectedType, type(var)))
+ @param condition: condition to check
+ @type condition: bool
+ @param message: message to display if the condition isn't met
+ @type message: str
+ """
+ assert condition, message
try:
- from leap.common.check import leap_assert_type
- soledad_assert_type = leap_assert_type
+ from leap.common.check import leap_assert_type as soledad_assert_type
+
except ImportError:
- pass
+
+ def soledad_assert_type(var, expectedType):
+ """
+ Helper assert check for a variable's expected type
+
+ @param var: variable to check
+ @type var: any
+ @param expectedType: type to check agains
+ @type expectedType: type
+ """
+ soledad_assert(isinstance(var, expectedType),
+ "Expected type %r instead of %r" %
+ (expectedType, type(var)))
#
# Signaling function
#
-# we define a fake signaling function and fake signal constants that will
-# allow for logging signaling attempts in case leap.common.events is not
-# available.
-
-def signal(signal, content=""):
- logger.info("Would signal: %s - %s." % (str(signal), content))
-
SOLEDAD_CREATING_KEYS = 'Creating keys...'
SOLEDAD_DONE_CREATING_KEYS = 'Done creating keys.'
SOLEDAD_DOWNLOADING_KEYS = 'Downloading keys...'
@@ -117,9 +103,7 @@ SOLEDAD_DONE_DATA_SYNC = 'Done data sync.'
# we want to use leap.common.events to emits signals, if it is available.
try:
from leap.common import events
- # replace fake signaling function with real one
- signal = events.signal
- # replace fake string signals with real signals
+ from leap.common.events import signal
SOLEDAD_CREATING_KEYS = events.events_pb2.SOLEDAD_CREATING_KEYS
SOLEDAD_DONE_CREATING_KEYS = events.events_pb2.SOLEDAD_DONE_CREATING_KEYS
SOLEDAD_DOWNLOADING_KEYS = events.events_pb2.SOLEDAD_DOWNLOADING_KEYS
@@ -130,18 +114,23 @@ try:
events.events_pb2.SOLEDAD_DONE_UPLOADING_KEYS
SOLEDAD_NEW_DATA_TO_SYNC = events.events_pb2.SOLEDAD_NEW_DATA_TO_SYNC
SOLEDAD_DONE_DATA_SYNC = events.events_pb2.SOLEDAD_DONE_DATA_SYNC
+
except ImportError:
- pass
+ # we define a fake signaling function and fake signal constants that will
+ # allow for logging signaling attempts in case leap.common.events is not
+ # available.
+
+ def signal(signal, content=""):
+ logger.info("Would signal: %s - %s." % (str(signal), content))
+from leap.soledad.crypto import SoledadCrypto
+from leap.soledad.dbwrapper import SQLCipherWrapper
from leap.soledad.document import SoledadDocument
-from leap.soledad.sqlcipher import (
- open as sqlcipher_open,
- SQLCipherDatabase,
-)
-from leap.soledad.target import SoledadSyncTarget
from leap.soledad.shared_db import SoledadSharedDatabase
-from leap.soledad.crypto import SoledadCrypto
+from leap.soledad.sqlcipher import open as sqlcipher_open
+from leap.soledad.sqlcipher import SQLCipherDatabase
+from leap.soledad.target import SoledadSyncTarget
logger = logging.getLogger(name=__name__)
@@ -439,7 +428,9 @@ class Soledad(object):
secret[salt_start:salt_end], # the salt
buflen=32, # we need a key with 256 bits (32 bytes)
)
- self._db = sqlcipher_open(
+
+ # Instantiate a thread-safe wrapper
+ self._db = SQLCipherWrapper(
self._local_db_path,
binascii.b2a_hex(key), # sqlcipher only accepts the hex version
create=True,
@@ -453,13 +444,14 @@ class Soledad(object):
"""
if hasattr(self, '_db') and isinstance(
self._db,
- SQLCipherDatabase):
+ SQLCipherWrapper):
self._db.close()
def __del__(self):
"""
Make sure local database is closed when object is destroyed.
"""
+ # Watch out! We have no guarantees that this is properly called.
self.close()
#
diff --git a/soledad/src/leap/soledad/dbwrapper.py b/soledad/src/leap/soledad/dbwrapper.py
new file mode 100644
index 00000000..c16c4925
--- /dev/null
+++ b/soledad/src/leap/soledad/dbwrapper.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+# dbwrapper.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Thread-safe wrapper for sqlite/pysqlcipher.
+
+*TODO*
+At some point we surely will want to switch to a twisted way of dealing
+with this, using defers and proper callbacks. But I had this tested for
+some time so postponing that refactor.
+"""
+import logging
+import threading
+import Queue
+import time
+
+import exceptions
+
+from functools import partial
+
+from leap.soledad import sqlcipher
+
+logger = logging.getLogger(__name__)
+
+
+class SQLCipherWrapper(threading.Thread):
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initializes a wrapper that proxies method and attribute
+ access to an underlying SQLCipher instance. We instantiate sqlcipher
+ in a thread, and all method accesses communicate with it using a
+ Queue.
+
+ :param *args: position arguments to pass to pysqlcipher initialization
+ :type args: tuple
+
+ :param **kwargs: keyword arguments to pass to pysqlcipher
+ initialization
+ :type kwargs: dict
+ """
+ threading.Thread.__init__(self)
+ self._db = None
+ self._wrargs = args, kwargs
+
+ self._queue = Queue.Queue()
+ self._stopped = threading.Event()
+
+ self.start()
+
+ def _init_db(self):
+ """
+ Initializes sqlcipher database.
+
+ This is called on a separate thread.
+ """
+ # instantiate u1db
+ args, kwargs = self._wrargs
+ self._db = sqlcipher.open(*args, **kwargs)
+
+ def run(self):
+ """
+ Main loop for the sqlcipher thread.
+ """
+ logger.debug("SQLCipherWrapper thread started.")
+ logger.debug("Initializing sqlcipher")
+ end_mths = ("__end_thread", "_SQLCipherWrapper__end_thread")
+
+ self._init_db()
+ self._lock = threading.Lock()
+
+ ct = 0
+ started = False
+
+ while True:
+ if self._db is None:
+ if started:
+ break
+ if ct > 10:
+ break # XXX DEBUG
+ logger.debug('db not ready yet, waiting...')
+ time.sleep(1)
+ ct += 1
+
+ started = True
+
+ with self._lock:
+ try:
+ mth, q, wrargs = self._queue.get()
+ except:
+ logger.error("exception getting args from queue")
+
+ res = None
+ attr = getattr(self._db, mth, None)
+ if not attr:
+ if mth not in end_mths:
+ logger.error('method %s does not exist' % (mth,))
+ res = AttributeError(
+ "_db instance has no attribute %s" % mth)
+
+ elif callable(attr):
+ # invoke the method with the passed args
+ args = wrargs.get('args', [])
+ kwargs = wrargs.get('kwargs', {})
+ try:
+ res = attr(*args, **kwargs)
+ except Exception as e:
+ logger.error(
+ "Error on proxied method %s: '%r'." % (
+ attr, e))
+ res = e
+ else:
+ # non-callable attribute
+ res = attr
+ logger.debug('returning proxied db call...')
+ q.put(res)
+
+ if mth in end_mths:
+ logger.debug('ending thread')
+ break
+
+ logger.debug("SQLCipherWrapper thread terminated.")
+ self._stopped.set()
+
+ def close(self):
+ """
+ Closes the sqlcipher database and finishes the thread. This method
+ should always be called explicitely.
+ """
+ self.__getattr__('close')()
+ self.__end_thread()
+
+ def __getattr__(self, attr):
+ """
+ Returns _db proxied attributes.
+ """
+
+ def __proxied_mth(method, *args, **kwargs):
+ if not self._stopped.isSet():
+ wrargs = {'args': args, 'kwargs': kwargs}
+ q = Queue.Queue()
+ self._queue.put((method, q, wrargs))
+ res = q.get()
+ q.task_done()
+
+ if isinstance(res, exceptions.BaseException):
+ # XXX should get the original bt
+ raise res
+ return res
+ else:
+ logger.warning("tried to call proxied meth "
+ "but stopped is set: %s" %
+ (method,))
+
+ rgetattr = object.__getattribute__
+
+ if attr != "_db":
+ proxied = partial(__proxied_mth, attr)
+ return proxied
+
+ # fallback to regular behavior
+ return rgetattr(self, attr)
+
+ def __del__(self):
+ """
+ Do not trust this get called. No guarantees given. Because of a funny
+ dance with the refs and the way the gc works, we should be calling the
+ close method explicitely.
+ """
+ self.close()