diff options
| -rw-r--r-- | client/src/leap/soledad/client/crypto.py | 46 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/mp_safe_db.py | 101 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 22 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/target.py | 17 | 
4 files changed, 143 insertions, 43 deletions
| diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 5ae5937f..eb5a4f64 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -224,7 +224,7 @@ class SoledadCrypto(object):          The password is derived using HMAC having sha256 as underlying hash          function. The key used for HMAC are the first -        C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage +        C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage          secret stripped from the first MAC_KEY_LENGTH characters. The HMAC          message is C{doc_id}. @@ -623,9 +623,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):          con = self._sync_db          with self._sync_db_write_lock: -            with con: -                con.execute(sql_del, (doc_id, )) -                con.execute(sql_ins, (doc_id, doc_rev, content)) +            con.execute(sql_del, (doc_id, )) +            con.execute(sql_ins, (doc_id, doc_rev, content))  def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -726,11 +725,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          con = self._sync_db          with self._sync_db_write_lock: -            with con: -                con.execute(sql_del, (doc_id, )) -                con.execute( -                    sql_ins, -                    (doc_id, doc_rev, docstr, gen, trans_id, 1)) +            con.execute(sql_del, (doc_id, )) +            con.execute( +                sql_ins, +                (doc_id, doc_rev, docstr, gen, trans_id, 1))      def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):          """ @@ -757,11 +755,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              self.TABLE_NAME,)          con = self._sync_db          with self._sync_db_write_lock: -            with con: -                con.execute(sql_del, (doc_id,)) -                con.execute( -                    sql_ins, -                    (doc_id, doc_rev, content, gen, trans_id, 0)) +            con.execute(sql_del, (doc_id,)) +            con.execute( +                sql_ins, +                (doc_id, doc_rev, content, gen, trans_id, 0))      def delete_received_doc(self, doc_id, doc_rev):          """ @@ -776,8 +773,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):              self.TABLE_NAME,)          con = self._sync_db          with self._sync_db_write_lock: -            with con: -                con.execute(sql_del, (doc_id, doc_rev)) +            con.execute(sql_del, (doc_id, doc_rev))      def decrypt_doc(self, doc_id, rev, content, gen, trans_id,                      source_replica_uid, workers=True): @@ -878,12 +874,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          if encrypted is not None:              sql += " WHERE encrypted = %d" % int(encrypted)          sql += " ORDER BY gen ASC" -        c = self._sync_db.cursor() -        c.execute(sql) -        # TODO: due to unknown reasons, the fetchall() method may return empty -        # values, so we filter them out here. We have to perform some tests to -        # understand why and when this happens. -        docs = filter(lambda entry: len(entry) > 0, c.fetchall()) +        docs = self._sync_db.select(sql)          return docs      def get_insertable_docs_by_gen(self): @@ -894,7 +885,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          decrypted_docs = self.get_docs_by_generation(encrypted=False)          insertable = []          for doc_id, rev, content, gen, trans_id, encrypted in all_docs: -            next_decrypted = decrypted_docs.pop(0) +            next_decrypted = decrypted_docs.next()              if doc_id == next_decrypted[0]:                  insertable.append((doc_id, rev, content, gen, trans_id))              else: @@ -915,14 +906,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          if self._sync_db is None:              logger.warning("cannot return count with null sync_db")              return -        c = self._sync_db.cursor()          sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)          if encrypted is not None:              sql += " WHERE encrypted = %d" % int(encrypted) -        c.execute(sql) -        res = c.fetchone() +        res = self._sync_db.select(sql)          if res is not None: -            return res[0] +            val = res.next() +            return val[0]          else:              return 0 @@ -932,8 +922,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):          decrypt worker to decrypt each one of them.          """          docs_by_generation = self.get_docs_by_generation(encrypted=True) -        logger.debug("Sync decrypter pool: There are %d documents to " \ -                     "decrypt." % len(docs_by_generation))          for doc_id, rev, content, gen, trans_id, _ \                  in filter(None, docs_by_generation):              self.decrypt_doc( diff --git a/client/src/leap/soledad/client/mp_safe_db.py b/client/src/leap/soledad/client/mp_safe_db.py new file mode 100644 index 00000000..a9ab5649 --- /dev/null +++ b/client/src/leap/soledad/client/mp_safe_db.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# crypto.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Multiprocessing-safe SQLite database. +""" + + +from threading import Thread +from Queue import Queue +from sqlite3 import connect as sqlite3_connect + + +# Thanks to http://code.activestate.com/recipes/526618/ + +class MPSafeSQLiteDB(Thread): +    """ +    A multiprocessing-safe SQLite database accessor. +    """ + +    CLOSE = "--close--" +    NO_MORE = "--no more--" + +    def __init__(self, db_path): +        """ +        Initialize the process +        """ +        Thread.__init__(self) +        self._db_path = db_path +        self._requests = Queue() +        self.start() + +    def run(self): +        """ +        Run the multiprocessing-safe database accessor. +        """ +        conn = sqlite3_connect(self._db_path) +        while True: +            req, arg, res = self._requests.get() +            if req == self.CLOSE: +                break +            with conn: +                cursor = conn.cursor() +                cursor.execute(req, arg) +                if res: +                    for rec in cursor.fetchall(): +                        res.put(rec) +                    res.put(self.NO_MORE) +        conn.close() + +    def execute(self, req, arg=None, res=None): +        """ +        Execute a request on the database. + +        :param req: The request to be executed. +        :type req: str +        :param arg: The arguments for the request. +        :type arg: tuple +        :param res: A queue to write request results. +        :type res: multiprocessing.Queue +        """ +        self._requests.put((req, arg or tuple(), res)) + +    def select(self, req, arg=None): +        """ +        Run a select query on the database and yield results. + +        :param req: The request to be executed. +        :type req: str +        :param arg: The arguments for the request. +        :type arg: tuple +        """ +        res = Queue() +        self.execute(req, arg, res) +        while True: +            rec=res.get() +            if rec == self.NO_MORE: +                break +            yield rec + +    def close(self): +        """ +        Close the database connection. +        """ +        self.execute(self.CLOSE) +        self.join() diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5a30b125..85b0391b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -44,7 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0.  import logging  import multiprocessing  import os -import sqlite3  import string  import threading  import time @@ -63,6 +62,7 @@ from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool  from leap.soledad.client.target import SoledadSyncTarget  from leap.soledad.client.target import PendingReceivedDocsSyncError  from leap.soledad.client.sync import SoledadSynchronizer +from leap.soledad.client.mp_safe_db import MPSafeSQLiteDB  from leap.soledad.common.document import SoledadDocument @@ -549,8 +549,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              sync_db_path = "%s-sync" % sqlcipher_file          else:              sync_db_path = ":memory:" -        self._sync_db = sqlite3.connect(sync_db_path, -                                        check_same_thread=False) +        self._sync_db = MPSafeSQLiteDB(sync_db_path)          self._sync_db_write_lock = threading.Lock()          self._create_sync_db_tables()          self.sync_queue = multiprocessing.Queue() @@ -567,9 +566,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):              decr.TABLE_NAME, decr.FIELD_NAMES))          with self._sync_db_write_lock: -            with self._sync_db: -                self._sync_db.execute(sql_encr) -                self._sync_db.execute(sql_decr) +            self._sync_db.execute(sql_encr) +            self._sync_db.execute(sql_decr)      #      # Symmetric encryption of syncing docs @@ -1076,16 +1074,28 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          Close db_handle and close syncer.          """          logger.debug("Sqlcipher backend: closing") +        # stop the sync watcher for deferred encryption          if self._sync_watcher is not None:              self._sync_watcher.stop()              self._sync_watcher.shutdown() +        # close all open syncers          for url in self._syncers:              _, syncer = self._syncers[url]              syncer.close() +        # stop the encryption pool          if self._sync_enc_pool is not None:              self._sync_enc_pool.close() +        # close the actual database          if self._db_handle is not None:              self._db_handle.close() +        # close the sync database +        if self._sync_db is not None: +            self._sync_db.close() +        # close the sync queue +        if self.sync_queue is not None: +            self.sync_queue.close() +            del self.sync_queue +            self.sync_queue = None      @property      def replica_uid(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 5fe55216..01e1231a 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1346,13 +1346,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          :type doc_rev: str          """          encr = SyncEncrypterPool -        c = self._sync_db.cursor()          sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (              encr.TABLE_NAME,)) -        c.execute(sql, (doc_id, doc_rev)) -        res = c.fetchall() -        if len(res) != 0: -            return res[0][0] +        res = self._sync_db.select(sql, (doc_id, doc_rev)) +        try: +            val = res.next() +            return val[0] +        except StopIteration: +            # no doc found +            return None +      def delete_encrypted_docs_from_db(self, docs_ids):          """ @@ -1365,12 +1368,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):          """          if docs_ids:              encr = SyncEncrypterPool -            c = self._sync_db.cursor()              for doc_id, doc_rev in docs_ids:                  sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (                      encr.TABLE_NAME,)) -                c.execute(sql, (doc_id, doc_rev)) -            self._sync_db.commit() +                self._sync_db.execute(sql, (doc_id, doc_rev))      def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):          """ | 
