diff options
| author | Kali Kaneko <kali@leap.se> | 2013-08-19 19:07:45 +0200 | 
|---|---|---|
| committer | drebs <drebs@leap.se> | 2013-08-21 00:25:33 -0300 | 
| commit | 03d9b34def31a535d4409f57aa08ffb71e741235 (patch) | |
| tree | ea3c6ebabc0313933c7565ff024deffa21a26a11 | |
| parent | e78140ae835a6ce17485891d24bc4c013119aabf (diff) | |
moar fix attempts using class lock
| -rw-r--r-- | soledad/src/leap/soledad/dbwrapper.py | 92 | ||||
| -rw-r--r-- | soledad/src/leap/soledad/sqlcipher.py | 37 | ||||
| -rw-r--r-- | soledad/src/leap/soledad/tests/test_sqlcipher.py | 2 | 
3 files changed, 87 insertions, 44 deletions
diff --git a/soledad/src/leap/soledad/dbwrapper.py b/soledad/src/leap/soledad/dbwrapper.py index 2a81086c..7e134d1b 100644 --- a/soledad/src/leap/soledad/dbwrapper.py +++ b/soledad/src/leap/soledad/dbwrapper.py @@ -25,7 +25,6 @@ some time so postponing that refactor.  import logging  import threading  import Queue -import time  import exceptions @@ -38,6 +37,8 @@ logger = logging.getLogger(__name__)  class SQLCipherWrapper(threading.Thread): +    k_lock = threading.Lock() +      def __init__(self, *args, **kwargs):          """          Initializes a wrapper that proxies method and attribute @@ -53,6 +54,7 @@ class SQLCipherWrapper(threading.Thread):          :type kwargs: dict          """          threading.Thread.__init__(self) +        self._lock = threading.Lock()          self._db = None          self._wrargs = args, kwargs @@ -69,53 +71,64 @@ class SQLCipherWrapper(threading.Thread):          """          # instantiate u1db          args, kwargs = self._wrargs -        try: -            self._db = sqlcipher.open(*args, **kwargs) -        except Exception as exc: -            logger.debug("Error in init_db: %r" % (exc,)) -            self._stopped.set() -            raise exc +        with self.k_lock: +            try: +                self._db = sqlcipher.open(*args, **kwargs) +                print "init ok" +            except Exception as exc: +                logger.debug("Error in init_db: %r" % (exc,)) +                self._stopped.set() +                raise exc      def run(self):          """          Main loop for the sqlcipher thread.          """ +        END_METHODS = ("__end_thread", "_SQLCipherWrapper__end_thread")          logger.debug("SQLCipherWrapper thread started.")          logger.debug("Initializing sqlcipher") -        end_mths = ("__end_thread", "_SQLCipherWrapper__end_thread") -        failed = False +        #ct = 0 +        #started = False +          try:              self._init_db()          except: -            failed = True -        self._lock = threading.Lock() - -        ct = 0 -        started = False +            logger.error("Failed to init db.") +            print "error on init" +            # XXX should raise? + +        while True: + +            #if self._db is None: +                #if started: +                    #break +                #if ct > 10: +                    #break  # XXX DEBUG +                #logger.debug('db not ready yet, waiting...') +                #print "db not ready, wait..." +                #time.sleep(1) +                #ct += 1 -        while not failed:              if self._db is None: -                if started: -                    break -                if ct > 10: -                    break  # XXX DEBUG -                logger.debug('db not ready yet, waiting...') -                time.sleep(1) -                ct += 1 - -            started = True +                print "db not initialized!" +            #started = True              with self._lock: +                  try: -                    mth, q, wrargs = self._queue.get() +                    # XXX not getting args... +                    mth, q, wrargs = self._queue.get()  # False?                  except: +                    print "Exception getting args"                      logger.error("exception getting args from queue") +                    continue +                print "mth: ", mth                  res = None                  attr = getattr(self._db, mth, None)                  if not attr: -                    if mth not in end_mths: +                    if mth not in END_METHODS:                          logger.error('method %s does not exist' % (mth,))                          res = AttributeError(                              "_db instance has no attribute %s" % mth) @@ -134,14 +147,16 @@ class SQLCipherWrapper(threading.Thread):                  else:                      # non-callable attribute                      res = attr -                logger.debug('returning proxied db call...') +                #logger.debug('returning proxied db call...') +                print "putting res: ", res                  q.put(res) -            if mth in end_mths: -                logger.debug('ending thread') -                break +                if mth in END_METHODS: +                    logger.debug('ending thread') +                    break          logger.debug("SQLCipherWrapper thread terminated.") +        print "SQLWRAPPER TERMINATED"          self._stopped.set()      def close(self): @@ -158,18 +173,27 @@ class SQLCipherWrapper(threading.Thread):          """          def __proxied_mth(method, *args, **kwargs): +            SQLWRAPPER_METHOD_TIMEOUT = 20  # in seconds              if not self._stopped.isSet():                  wrargs = {'args': args, 'kwargs': kwargs}                  q = Queue.Queue() -                self._queue.put((method, q, wrargs)) -                res = q.get() -                q.task_done() +                print "put meth" +                with self.k_lock: +                    self._queue.put((method, q, wrargs)) +                    try: +                        # XXX this is blocking +                        res = q.get(timeout=SQLWRAPPER_METHOD_TIMEOUT) +                        q.task_done() +                    except Queue.Empty: +                        res = None +                    print "got result"                  if isinstance(res, exceptions.BaseException):                      # XXX should get the original bt                      raise res                  return res              else: +                print "called but stopped!"                  logger.warning("tried to call proxied meth "                                 "but stopped is set: %s" %                                 (method,)) @@ -185,6 +209,8 @@ class SQLCipherWrapper(threading.Thread):      def __del__(self):          """ +        Closes the thread upon object destruction. +          Do not trust this get called. No guarantees given. Because of a funny          dance with the refs and the way the gc works, we should be calling the          close method explicitely. diff --git a/soledad/src/leap/soledad/sqlcipher.py b/soledad/src/leap/soledad/sqlcipher.py index ca61b4bf..18a8fe6e 100644 --- a/soledad/src/leap/soledad/sqlcipher.py +++ b/soledad/src/leap/soledad/sqlcipher.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- +    # -*- coding: utf-8 -*-  # sqlcipher.py  # Copyright (C) 2013 LEAP  # @@ -47,6 +47,7 @@ import logging  import os  import time  import string +import threading  from u1db.backends import sqlite_backend @@ -125,6 +126,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):      """A U1DB implementation that uses SQLCipher as its persistence layer."""      _index_storage_value = 'expand referenced encrypted' +    k_lock = threading.Lock()      def __init__(self, sqlcipher_file, password, document_factory=None,                   crypto=None, raw_key=False, cipher='aes-256-cbc', @@ -158,14 +160,15 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                  sqlcipher_file, password, raw_key, cipher, kdf_iter,                  cipher_page_size)          # connect to the database -        self._db_handle = dbapi2.connect(sqlcipher_file) -        # set SQLCipher cryptographic parameters -        self._set_crypto_pragmas( -            self._db_handle, password, raw_key, cipher, kdf_iter, -            cipher_page_size) -        self._real_replica_uid = None -        self._ensure_schema() -        self._crypto = crypto +        with self.k_lock: +            self._db_handle = dbapi2.connect(sqlcipher_file) +            # set SQLCipher cryptographic parameters +            self._set_crypto_pragmas( +                self._db_handle, password, raw_key, cipher, kdf_iter, +                cipher_page_size) +            self._real_replica_uid = None +            self._ensure_schema() +            self._crypto = crypto          def factory(doc_id=None, rev=None, json='{}', has_conflicts=False,                      syncable=True): @@ -207,8 +210,9 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          if not os.path.isfile(sqlcipher_file):              raise u1db_errors.DatabaseDoesNotExist() -        tries = 30 +        tries = 2          while True: +            print "tries: ", tries              # Note: There seems to be a bug in sqlite 3.5.9 (with python2.6)              #       where without re-opening the database on Windows, it              #       doesn't see the transaction that was just committed @@ -225,7 +229,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):                  v, err = cls._which_index_storage(c)              except Exception as exc:                  logger.warning("ERROR OPENING DATABASE!") +                print "ERROR OPENING DB ---------"                  logger.debug("error was: %r" % exc) +                print "error was: %r" % exc +                print exc.__class__                  v, err = None, exc              finally:                  db_handle.close() @@ -234,8 +241,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              # possibly another process is initializing it, wait for it to be              # done              if tries == 0: +                print "ERROR, raise"                  raise err  # go for the richest error?              tries -= 1 +            print "sleeping"              time.sleep(cls.WAIT_FOR_PARALLEL_INIT_HALF_INTERVAL)          return SQLCipherDatabase._sqlite_registry[v](              sqlcipher_file, password, document_factory=document_factory, @@ -325,6 +334,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          @param c: The cursor for querying the database.          @type c: dbapi2.cursor          """ +        # XXX used??? private method not used in module          c.execute(              'ALTER TABLE document '              'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE') @@ -659,5 +669,12 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              raise NotAnHexString(key)          db_handle.cursor().execute('PRAGMA rekey = "x\'%s"' % passphrase) +    def __del__(self): +        """ +        Closes db_handle upon object destruction. +        """ +        if self._db_handle is not None: +            self._db_handle.close() +  sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase) diff --git a/soledad/src/leap/soledad/tests/test_sqlcipher.py b/soledad/src/leap/soledad/tests/test_sqlcipher.py index 25d04861..4c4384b8 100644 --- a/soledad/src/leap/soledad/tests/test_sqlcipher.py +++ b/soledad/src/leap/soledad/tests/test_sqlcipher.py @@ -682,7 +682,7 @@ class SQLCipherSyncTargetTests(      def setUp(self):          test_sync.DatabaseSyncTargetTests.setUp(self) -        BaseSoledadTest.setUp(self) +        #BaseSoledadTest.setUp(self)      def tearDown(self):          test_sync.DatabaseSyncTargetTests.tearDown(self)  | 
