summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r--client/src/leap/soledad/client/crypto.py190
1 files changed, 108 insertions, 82 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 107bf7f1..dd40b198 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -25,11 +25,15 @@ import json
import logging
import multiprocessing
import threading
+import time
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
from zope.proxy import sameProxiedObjects
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
@@ -227,7 +231,7 @@ class SoledadCrypto(object):
#
def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv,
- mac_method, secret):
+ mac_method, secret):
"""
Calculate a MAC for C{doc} using C{ciphertext}.
@@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc):
def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method,
- enc_iv, mac_method, secret, doc_mac):
+ enc_iv, mac_method, secret, doc_mac):
"""
Verify that C{doc_mac} is a correct MAC for the given document.
@@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object):
"""
WORKERS = multiprocessing.cpu_count()
- def __init__(self, crypto, sync_db, write_lock):
+ def __init__(self, crypto, sync_db):
"""
Initialize the pool of encryption-workers.
@@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object):
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
self._sync_db = sync_db
- self._sync_db_write_lock = write_lock
def close(self):
"""
@@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# TODO implement throttling to reduce cpu usage??
WORKERS = multiprocessing.cpu_count()
TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id, rev, content"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
def encrypt_doc(self, doc, workers=True):
"""
@@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ @defer.inlineCallbacks
def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
@@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
# FIXME --- callback should complete immediately since otherwise the
# thread which handles the results will get blocked
# Right now we're blocking the dispatcher with the writes to sqlite.
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,)
-
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(sql_ins, (doc_id, doc_rev, content))
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
@@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
# TODO implement throttling to reduce cpu usage??
TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted"
+ FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted"
- write_encrypted_lock = threading.Lock()
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
def __init__(self, *args, **kwargs):
"""
@@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type last_known_generation: int
"""
self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self.source_replica_uid = None
self._async_results = []
- def set_source_replica_uid(self, source_replica_uid):
- """
- Set the source replica uid for this decrypter pool instance.
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
- """
- self.source_replica_uid = source_replica_uid
+ self._stopped = threading.Event()
+ self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished decryptor thread."))
+ @defer.inlineCallbacks
def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:type trans_id: str
"""
docstr = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, docstr, gen, trans_id, 1))
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, ))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
+ @defer.inlineCallbacks
def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
"""
Insert a document that is not symmetrically encrypted.
@@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
if not isinstance(content, str):
content = json.dumps(content)
- sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (
- self.TABLE_NAME,)
- sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id,))
- con.execute(
- sql_ins,
- (doc_id, doc_rev, content, gen, trans_id, 0))
+ yield self._sync_db.runQuery(
+ query,
+ (doc_id, doc_rev, content, gen, trans_id, 0))
+ @defer.inlineCallbacks
def delete_received_doc(self, doc_id, doc_rev):
"""
Delete a received doc after it was inserted into the local db.
@@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
self.TABLE_NAME,)
- con = self._sync_db
- with self._sync_db_write_lock:
- con.execute(sql_del, (doc_id, doc_rev))
+ yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev))
- def decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
+ def _decrypt_doc(self, doc_id, rev, content, gen, trans_id,
+ source_replica_uid, workers=True):
"""
Symmetrically decrypt a document.
@@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# save the async result object so we can inspect it for failures
self._async_results.append(self._pool.apply_async(
decrypt_doc_task, args,
- callback=self.decrypt_doc_cb))
+ callback=self._decrypt_doc_cb))
else:
# decrypt inline
res = decrypt_doc_task(*args)
- self.decrypt_doc_cb(res)
+ self._decrypt_doc_cb(res)
- def decrypt_doc_cb(self, result):
+ def _decrypt_doc_cb(self, result):
"""
Store the decryption result in the sync db from where it will later be
- picked by process_decrypted.
+ picked by _process_decrypted.
:param result: A tuple containing the doc id, revision and encrypted
content.
@@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
doc_id, rev, content, gen, trans_id = result
logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
% (doc_id, rev, gen, trans_id))
- self.insert_received_doc(doc_id, rev, content, gen, trans_id)
+ return self.insert_received_doc(doc_id, rev, content, gen, trans_id)
def get_docs_by_generation(self, encrypted=None):
"""
@@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
sql += " ORDER BY gen ASC"
return self._fetchall(sql)
+ @defer.inlineCallbacks
def get_insertable_docs_by_gen(self):
"""
Return a list of non-encrypted documents ready to be inserted.
@@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# docs, then some document might have been decrypted between these two
# calls, and if it is just the right doc then it might not be caught
# by the next loop.
- all_docs = self.get_docs_by_generation()
- decrypted_docs = self.get_docs_by_generation(encrypted=False)
+ all_docs = yield self.get_docs_by_generation()
+ decrypted_docs = yield self.get_docs_by_generation(encrypted=False)
insertable = []
for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
@@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insertable.append((doc_id, rev, content, gen, trans_id))
else:
break
- return insertable
+ defer.returnValue(insertable)
- def count_docs_in_sync_db(self, encrypted=None):
+ @defer.inlineCallbacks
+ def _count_docs_in_sync_db(self, encrypted=None):
"""
Count how many documents we have in the table for received docs.
@@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:return: The count of documents.
:rtype: int
"""
- if self._sync_db is None:
- logger.warning("cannot return count with null sync_db")
- return
- sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- res = self._fetchall(sql)
+ query += " WHERE encrypted = %d" % int(encrypted)
+ res = yield self._sync_db.runQuery(query)
if res:
val = res.pop()
- return val[0]
+ defer.returnValue(val[0])
else:
- return 0
+ defer.returnValue(0)
- def decrypt_received_docs(self):
+ @defer.inlineCallbacks
+ def _decrypt_received_docs(self):
"""
Get all the encrypted documents from the sync database and dispatch a
decrypt worker to decrypt each one of them.
"""
- docs_by_generation = self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ \
- in filter(None, docs_by_generation):
- self.decrypt_doc(
+ self._raise_in_case_of_failed_async_calls()
+ docs_by_generation = yield self.get_docs_by_generation(encrypted=True)
+ for doc_id, rev, content, gen, trans_id, _ in docs_by_generation:
+ self._decrypt_doc(
doc_id, rev, content, gen, trans_id, self.source_replica_uid)
- def process_decrypted(self):
+ @defer.inlineCallbacks
+ def _process_decrypted(self):
"""
Process the already decrypted documents, and insert as many documents
as can be taken from the expected order without finding a gap.
@@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# Acquire the lock to avoid processing while we're still
# getting data from the syncing stream, to avoid InvalidGeneration
# problems.
- with self.write_encrypted_lock:
- for doc_fields in self.get_insertable_docs_by_gen():
- self.insert_decrypted_local_doc(*doc_fields)
- remaining = self.count_docs_in_sync_db()
- return remaining == 0
+ insertable = yield self.get_insertable_docs_by_gen()
+ for doc_fields in insertable:
+ yield self.insert_decrypted_local_doc(*doc_fields)
+ @defer.inlineCallbacks
def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
gen, trans_id):
"""
@@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
insert_fun(doc, gen, trans_id)
# If no errors found, remove it from the received database.
- self.delete_received_doc(doc_id, doc_rev)
+ yield self.delete_received_doc(doc_id, doc_rev)
+ @defer.inlineCallbacks
def empty(self):
"""
Empty the received docs table of the sync database.
"""
sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- self._sync_db.execute(sql)
+ yield self._sync_db.runQuery(sql)
+ @defer.inlineCallbacks
def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()
+ results = yield self._sync_db.runQuery(*args, **kwargs)
+ defer.returnValue(results)
- def raise_in_case_of_failed_async_calls(self):
+ def _raise_in_case_of_failed_async_calls(self):
"""
Re-raise any exception raised by an async call.
@@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if not res.successful():
# re-raise the exception raised by the remote call
res.get()
+
+ def _stop_decr_loop(self):
+ """
+ """
+ self._stopped.set()
+
+ def close(self):
+ """
+ """
+ self._stop_decr_loop()
+ SyncEncryptDecryptPool.close(self)
+
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from LoopingCall self._sync_loop.
+ """
+ while not self._stopped.is_set():
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+ self._decrypt_received_docs()
+ self._process_decrypted()
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ def wait(self):
+ while not self.clear_to_sync():
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+
+ @defer.inlineCallbacks
+ def clear_to_sync(self):
+ count = yield self._count_docs_in_sync_db()
+ defer.returnValue(count == 0)