From 622708945d51a1e22dde95424a6214e8e67be180 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 23 Jul 2014 16:26:24 -0300 Subject: Make sync database multiprocessing-safe. --- client/src/leap/soledad/client/crypto.py | 46 +++++------- client/src/leap/soledad/client/mp_safe_db.py | 101 +++++++++++++++++++++++++++ client/src/leap/soledad/client/sqlcipher.py | 22 ++++-- client/src/leap/soledad/client/target.py | 17 ++--- 4 files changed, 143 insertions(+), 43 deletions(-) create mode 100644 client/src/leap/soledad/client/mp_safe_db.py (limited to 'client/src') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 5ae5937f..eb5a4f64 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -224,7 +224,7 @@ class SoledadCrypto(object): The password is derived using HMAC having sha256 as underlying hash function. The key used for HMAC are the first - C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage + C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage secret stripped from the first MAC_KEY_LENGTH characters. The HMAC message is C{doc_id}. @@ -623,9 +623,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): con = self._sync_db with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) + con.execute(sql_del, (doc_id, )) + con.execute(sql_ins, (doc_id, doc_rev, content)) def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -726,11 +725,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): con = self._sync_db with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) + con.execute(sql_del, (doc_id, )) + con.execute( + sql_ins, + (doc_id, doc_rev, docstr, gen, trans_id, 1)) def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -757,11 +755,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self.TABLE_NAME,) con = self._sync_db with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) + con.execute(sql_del, (doc_id,)) + con.execute( + sql_ins, + (doc_id, doc_rev, content, gen, trans_id, 0)) def delete_received_doc(self, doc_id, doc_rev): """ @@ -776,8 +773,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): self.TABLE_NAME,) con = self._sync_db with self._sync_db_write_lock: - with con: - con.execute(sql_del, (doc_id, doc_rev)) + con.execute(sql_del, (doc_id, doc_rev)) def decrypt_doc(self, doc_id, rev, content, gen, trans_id, source_replica_uid, workers=True): @@ -878,12 +874,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if encrypted is not None: sql += " WHERE encrypted = %d" % int(encrypted) sql += " ORDER BY gen ASC" - c = self._sync_db.cursor() - c.execute(sql) - # TODO: due to unknown reasons, the fetchall() method may return empty - # values, so we filter them out here. We have to perform some tests to - # understand why and when this happens. - docs = filter(lambda entry: len(entry) > 0, c.fetchall()) + docs = self._sync_db.select(sql) return docs def get_insertable_docs_by_gen(self): @@ -894,7 +885,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): decrypted_docs = self.get_docs_by_generation(encrypted=False) insertable = [] for doc_id, rev, content, gen, trans_id, encrypted in all_docs: - next_decrypted = decrypted_docs.pop(0) + next_decrypted = decrypted_docs.next() if doc_id == next_decrypted[0]: insertable.append((doc_id, rev, content, gen, trans_id)) else: @@ -915,14 +906,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if self._sync_db is None: logger.warning("cannot return count with null sync_db") return - c = self._sync_db.cursor() sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) if encrypted is not None: sql += " WHERE encrypted = %d" % int(encrypted) - c.execute(sql) - res = c.fetchone() + res = self._sync_db.select(sql) if res is not None: - return res[0] + val = res.next() + return val[0] else: return 0 @@ -932,8 +922,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): decrypt worker to decrypt each one of them. """ docs_by_generation = self.get_docs_by_generation(encrypted=True) - logger.debug("Sync decrypter pool: There are %d documents to " \ - "decrypt." % len(docs_by_generation)) for doc_id, rev, content, gen, trans_id, _ \ in filter(None, docs_by_generation): self.decrypt_doc( diff --git a/client/src/leap/soledad/client/mp_safe_db.py b/client/src/leap/soledad/client/mp_safe_db.py new file mode 100644 index 00000000..a9ab5649 --- /dev/null +++ b/client/src/leap/soledad/client/mp_safe_db.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# crypto.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +Multiprocessing-safe SQLite database. +""" + + +from threading import Thread +from Queue import Queue +from sqlite3 import connect as sqlite3_connect + + +# Thanks to http://code.activestate.com/recipes/526618/ + +class MPSafeSQLiteDB(Thread): + """ + A multiprocessing-safe SQLite database accessor. + """ + + CLOSE = "--close--" + NO_MORE = "--no more--" + + def __init__(self, db_path): + """ + Initialize the process + """ + Thread.__init__(self) + self._db_path = db_path + self._requests = Queue() + self.start() + + def run(self): + """ + Run the multiprocessing-safe database accessor. + """ + conn = sqlite3_connect(self._db_path) + while True: + req, arg, res = self._requests.get() + if req == self.CLOSE: + break + with conn: + cursor = conn.cursor() + cursor.execute(req, arg) + if res: + for rec in cursor.fetchall(): + res.put(rec) + res.put(self.NO_MORE) + conn.close() + + def execute(self, req, arg=None, res=None): + """ + Execute a request on the database. + + :param req: The request to be executed. + :type req: str + :param arg: The arguments for the request. + :type arg: tuple + :param res: A queue to write request results. + :type res: multiprocessing.Queue + """ + self._requests.put((req, arg or tuple(), res)) + + def select(self, req, arg=None): + """ + Run a select query on the database and yield results. + + :param req: The request to be executed. + :type req: str + :param arg: The arguments for the request. + :type arg: tuple + """ + res = Queue() + self.execute(req, arg, res) + while True: + rec=res.get() + if rec == self.NO_MORE: + break + yield rec + + def close(self): + """ + Close the database connection. + """ + self.execute(self.CLOSE) + self.join() diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 5a30b125..85b0391b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -44,7 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0. import logging import multiprocessing import os -import sqlite3 import string import threading import time @@ -63,6 +62,7 @@ from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer +from leap.soledad.client.mp_safe_db import MPSafeSQLiteDB from leap.soledad.common.document import SoledadDocument @@ -549,8 +549,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): sync_db_path = "%s-sync" % sqlcipher_file else: sync_db_path = ":memory:" - self._sync_db = sqlite3.connect(sync_db_path, - check_same_thread=False) + self._sync_db = MPSafeSQLiteDB(sync_db_path) self._sync_db_write_lock = threading.Lock() self._create_sync_db_tables() self.sync_queue = multiprocessing.Queue() @@ -567,9 +566,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): decr.TABLE_NAME, decr.FIELD_NAMES)) with self._sync_db_write_lock: - with self._sync_db: - self._sync_db.execute(sql_encr) - self._sync_db.execute(sql_decr) + self._sync_db.execute(sql_encr) + self._sync_db.execute(sql_decr) # # Symmetric encryption of syncing docs @@ -1076,16 +1074,28 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): Close db_handle and close syncer. """ logger.debug("Sqlcipher backend: closing") + # stop the sync watcher for deferred encryption if self._sync_watcher is not None: self._sync_watcher.stop() self._sync_watcher.shutdown() + # close all open syncers for url in self._syncers: _, syncer = self._syncers[url] syncer.close() + # stop the encryption pool if self._sync_enc_pool is not None: self._sync_enc_pool.close() + # close the actual database if self._db_handle is not None: self._db_handle.close() + # close the sync database + if self._sync_db is not None: + self._sync_db.close() + # close the sync queue + if self.sync_queue is not None: + self.sync_queue.close() + del self.sync_queue + self.sync_queue = None @property def replica_uid(self): diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 5fe55216..01e1231a 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -1346,13 +1346,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type doc_rev: str """ encr = SyncEncrypterPool - c = self._sync_db.cursor() sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( encr.TABLE_NAME,)) - c.execute(sql, (doc_id, doc_rev)) - res = c.fetchall() - if len(res) != 0: - return res[0][0] + res = self._sync_db.select(sql, (doc_id, doc_rev)) + try: + val = res.next() + return val[0] + except StopIteration: + # no doc found + return None + def delete_encrypted_docs_from_db(self, docs_ids): """ @@ -1365,12 +1368,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ if docs_ids: encr = SyncEncrypterPool - c = self._sync_db.cursor() for doc_id, doc_rev in docs_ids: sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( encr.TABLE_NAME,)) - c.execute(sql, (doc_id, doc_rev)) - self._sync_db.commit() + self._sync_db.execute(sql, (doc_id, doc_rev)) def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ -- cgit v1.2.3