summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2014-07-23 16:26:24 -0300
committerdrebs <drebs@leap.se>2014-08-08 11:49:02 -0300
commit622708945d51a1e22dde95424a6214e8e67be180 (patch)
treecafaa840b1bc00717a3d4e96b6a0f09fb9dfa0aa /client/src/leap
parent609669077b2f7223c31feed3679c8fcd74ab9ba7 (diff)
Make sync database multiprocessing-safe.
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/crypto.py46
-rw-r--r--client/src/leap/soledad/client/mp_safe_db.py101
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py22
-rw-r--r--client/src/leap/soledad/client/target.py17
4 files changed, 143 insertions, 43 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 5ae5937f..eb5a4f64 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -224,7 +224,7 @@ class SoledadCrypto(object):
The password is derived using HMAC having sha256 as underlying hash
function. The key used for HMAC are the first
- C{soledad.REMOTE_STORAGE_SECRET_KENGTH} bytes of Soledad's storage
+ C{soledad.REMOTE_STORAGE_SECRET_LENGTH} bytes of Soledad's storage
secret stripped from the first MAC_KEY_LENGTH characters. The HMAC
message is C{doc_id}.
@@ -623,9 +623,8 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
+ con.execute(sql_del, (doc_id, ))
+ con.execute(sql_ins, (doc_id, doc_rev, content))
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
@@ -726,11 +725,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, ))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
+ con.execute(sql_del, (doc_id, ))
+ con.execute(
+ sql_ins,
+ (doc_id, doc_rev, docstr, gen, trans_id, 1))
def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
"""
@@ -757,11 +755,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id,))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, content, gen, trans_id, 0))
+ con.execute(sql_del, (doc_id,))
+ con.execute(
+ sql_ins,
+ (doc_id, doc_rev, content, gen, trans_id, 0))
def delete_received_doc(self, doc_id, doc_rev):
"""
@@ -776,8 +773,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self.TABLE_NAME,)
con = self._sync_db
with self._sync_db_write_lock:
- with con:
- con.execute(sql_del, (doc_id, doc_rev))
+ con.execute(sql_del, (doc_id, doc_rev))
def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
source_replica_uid, workers=True):
@@ -878,12 +874,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if encrypted is not None:
sql += " WHERE encrypted = %d" % int(encrypted)
sql += " ORDER BY gen ASC"
- c = self._sync_db.cursor()
- c.execute(sql)
- # TODO: due to unknown reasons, the fetchall() method may return empty
- # values, so we filter them out here. We have to perform some tests to
- # understand why and when this happens.
- docs = filter(lambda entry: len(entry) > 0, c.fetchall())
+ docs = self._sync_db.select(sql)
return docs
def get_insertable_docs_by_gen(self):
@@ -894,7 +885,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
decrypted_docs = self.get_docs_by_generation(encrypted=False)
insertable = []
for doc_id, rev, content, gen, trans_id, encrypted in all_docs:
- next_decrypted = decrypted_docs.pop(0)
+ next_decrypted = decrypted_docs.next()
if doc_id == next_decrypted[0]:
insertable.append((doc_id, rev, content, gen, trans_id))
else:
@@ -915,14 +906,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if self._sync_db is None:
logger.warning("cannot return count with null sync_db")
return
- c = self._sync_db.cursor()
sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
if encrypted is not None:
sql += " WHERE encrypted = %d" % int(encrypted)
- c.execute(sql)
- res = c.fetchone()
+ res = self._sync_db.select(sql)
if res is not None:
- return res[0]
+ val = res.next()
+ return val[0]
else:
return 0
@@ -932,8 +922,6 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
decrypt worker to decrypt each one of them.
"""
docs_by_generation = self.get_docs_by_generation(encrypted=True)
- logger.debug("Sync decrypter pool: There are %d documents to " \
- "decrypt." % len(docs_by_generation))
for doc_id, rev, content, gen, trans_id, _ \
in filter(None, docs_by_generation):
self.decrypt_doc(
diff --git a/client/src/leap/soledad/client/mp_safe_db.py b/client/src/leap/soledad/client/mp_safe_db.py
new file mode 100644
index 00000000..a9ab5649
--- /dev/null
+++ b/client/src/leap/soledad/client/mp_safe_db.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# crypto.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Multiprocessing-safe SQLite database.
+"""
+
+
+from threading import Thread
+from Queue import Queue
+from sqlite3 import connect as sqlite3_connect
+
+
+# Thanks to http://code.activestate.com/recipes/526618/
+
+class MPSafeSQLiteDB(Thread):
+ """
+ A multiprocessing-safe SQLite database accessor.
+ """
+
+ CLOSE = "--close--"
+ NO_MORE = "--no more--"
+
+ def __init__(self, db_path):
+ """
+ Initialize the process
+ """
+ Thread.__init__(self)
+ self._db_path = db_path
+ self._requests = Queue()
+ self.start()
+
+ def run(self):
+ """
+ Run the multiprocessing-safe database accessor.
+ """
+ conn = sqlite3_connect(self._db_path)
+ while True:
+ req, arg, res = self._requests.get()
+ if req == self.CLOSE:
+ break
+ with conn:
+ cursor = conn.cursor()
+ cursor.execute(req, arg)
+ if res:
+ for rec in cursor.fetchall():
+ res.put(rec)
+ res.put(self.NO_MORE)
+ conn.close()
+
+ def execute(self, req, arg=None, res=None):
+ """
+ Execute a request on the database.
+
+ :param req: The request to be executed.
+ :type req: str
+ :param arg: The arguments for the request.
+ :type arg: tuple
+ :param res: A queue to write request results.
+ :type res: multiprocessing.Queue
+ """
+ self._requests.put((req, arg or tuple(), res))
+
+ def select(self, req, arg=None):
+ """
+ Run a select query on the database and yield results.
+
+ :param req: The request to be executed.
+ :type req: str
+ :param arg: The arguments for the request.
+ :type arg: tuple
+ """
+ res = Queue()
+ self.execute(req, arg, res)
+ while True:
+ rec=res.get()
+ if rec == self.NO_MORE:
+ break
+ yield rec
+
+ def close(self):
+ """
+ Close the database connection.
+ """
+ self.execute(self.CLOSE)
+ self.join()
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 5a30b125..85b0391b 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -44,7 +44,6 @@ handled by Soledad should be created by SQLCipher >= 2.0.
import logging
import multiprocessing
import os
-import sqlite3
import string
import threading
import time
@@ -63,6 +62,7 @@ from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.client.sync import SoledadSynchronizer
+from leap.soledad.client.mp_safe_db import MPSafeSQLiteDB
from leap.soledad.common.document import SoledadDocument
@@ -549,8 +549,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
sync_db_path = "%s-sync" % sqlcipher_file
else:
sync_db_path = ":memory:"
- self._sync_db = sqlite3.connect(sync_db_path,
- check_same_thread=False)
+ self._sync_db = MPSafeSQLiteDB(sync_db_path)
self._sync_db_write_lock = threading.Lock()
self._create_sync_db_tables()
self.sync_queue = multiprocessing.Queue()
@@ -567,9 +566,8 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
decr.TABLE_NAME, decr.FIELD_NAMES))
with self._sync_db_write_lock:
- with self._sync_db:
- self._sync_db.execute(sql_encr)
- self._sync_db.execute(sql_decr)
+ self._sync_db.execute(sql_encr)
+ self._sync_db.execute(sql_decr)
#
# Symmetric encryption of syncing docs
@@ -1076,16 +1074,28 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
Close db_handle and close syncer.
"""
logger.debug("Sqlcipher backend: closing")
+ # stop the sync watcher for deferred encryption
if self._sync_watcher is not None:
self._sync_watcher.stop()
self._sync_watcher.shutdown()
+ # close all open syncers
for url in self._syncers:
_, syncer = self._syncers[url]
syncer.close()
+ # stop the encryption pool
if self._sync_enc_pool is not None:
self._sync_enc_pool.close()
+ # close the actual database
if self._db_handle is not None:
self._db_handle.close()
+ # close the sync database
+ if self._sync_db is not None:
+ self._sync_db.close()
+ # close the sync queue
+ if self.sync_queue is not None:
+ self.sync_queue.close()
+ del self.sync_queue
+ self.sync_queue = None
@property
def replica_uid(self):
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 5fe55216..01e1231a 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -1346,13 +1346,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type doc_rev: str
"""
encr = SyncEncrypterPool
- c = self._sync_db.cursor()
sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- c.execute(sql, (doc_id, doc_rev))
- res = c.fetchall()
- if len(res) != 0:
- return res[0][0]
+ res = self._sync_db.select(sql, (doc_id, doc_rev))
+ try:
+ val = res.next()
+ return val[0]
+ except StopIteration:
+ # no doc found
+ return None
+
def delete_encrypted_docs_from_db(self, docs_ids):
"""
@@ -1365,12 +1368,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
if docs_ids:
encr = SyncEncrypterPool
- c = self._sync_db.cursor()
for doc_id, doc_rev in docs_ids:
sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
encr.TABLE_NAME,))
- c.execute(sql, (doc_id, doc_rev))
- self._sync_db.commit()
+ self._sync_db.execute(sql, (doc_id, doc_rev))
def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""