diff options
Diffstat (limited to 'client/src/leap/soledad/client/crypto.py')
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 190 |
1 files changed, 108 insertions, 82 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 107bf7f1..dd40b198 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -25,11 +25,15 @@ import json import logging import multiprocessing import threading +import time from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 from zope.proxy import sameProxiedObjects +from twisted.internet import defer +from twisted.internet.threads import deferToThread + from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto @@ -227,7 +231,7 @@ class SoledadCrypto(object): # def mac_doc(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, enc_iv, - mac_method, secret): + mac_method, secret): """ Calculate a MAC for C{doc} using C{ciphertext}. @@ -378,7 +382,7 @@ def decrypt_doc(crypto, doc): def _verify_doc_mac(doc_id, doc_rev, ciphertext, enc_scheme, enc_method, - enc_iv, mac_method, secret, doc_mac): + enc_iv, mac_method, secret, doc_mac): """ Verify that C{doc_mac} is a correct MAC for the given document. @@ -523,7 +527,7 @@ class SyncEncryptDecryptPool(object): """ WORKERS = multiprocessing.cpu_count() - def __init__(self, crypto, sync_db, write_lock): + def __init__(self, crypto, sync_db): """ Initialize the pool of encryption-workers. @@ -540,7 +544,6 @@ class SyncEncryptDecryptPool(object): self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto self._sync_db = sync_db - self._sync_db_write_lock = write_lock def close(self): """ @@ -592,7 +595,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # TODO implement throttling to reduce cpu usage?? WORKERS = multiprocessing.cpu_count() TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id, rev, content" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" def encrypt_doc(self, doc, workers=True): """ @@ -633,8 +636,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + @defer.inlineCallbacks def insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync @@ -652,13 +656,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): # FIXME --- callback should complete immediately since otherwise the # thread which handles the results will get blocked # Right now we're blocking the dispatcher with the writes to sqlite. - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?)" % (self.TABLE_NAME,) - - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute(sql_ins, (doc_id, doc_rev, content)) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): @@ -704,9 +704,12 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ # TODO implement throttling to reduce cpu usage?? TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id, rev, content, gen, trans_id, encrypted" + FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - write_encrypted_lock = threading.Lock() + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 def __init__(self, *args, **kwargs): """ @@ -723,19 +726,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type last_known_generation: int """ self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self.source_replica_uid = None self._async_results = [] - def set_source_replica_uid(self, source_replica_uid): - """ - Set the source replica uid for this decrypter pool instance. - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - """ - self.source_replica_uid = source_replica_uid + self._stopped = threading.Event() + self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished decryptor thread.")) + @defer.inlineCallbacks def insert_encrypted_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -754,17 +754,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :type trans_id: str """ docstr = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % (self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, docstr, gen, trans_id, 1)) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, )) - con.execute( - sql_ins, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - + @defer.inlineCallbacks def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): """ Insert a document that is not symmetrically encrypted. @@ -784,17 +780,13 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ if not isinstance(content, str): content = json.dumps(content) - sql_del = "DELETE FROM '%s' WHERE doc_id=?" % ( - self.TABLE_NAME,) - sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id,)) - con.execute( - sql_ins, - (doc_id, doc_rev, content, gen, trans_id, 0)) + yield self._sync_db.runQuery( + query, + (doc_id, doc_rev, content, gen, trans_id, 0)) + @defer.inlineCallbacks def delete_received_doc(self, doc_id, doc_rev): """ Delete a received doc after it was inserted into the local db. @@ -806,12 +798,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( self.TABLE_NAME,) - con = self._sync_db - with self._sync_db_write_lock: - con.execute(sql_del, (doc_id, doc_rev)) + yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - def decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): + def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, + source_replica_uid, workers=True): """ Symmetrically decrypt a document. @@ -860,16 +850,16 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # save the async result object so we can inspect it for failures self._async_results.append(self._pool.apply_async( decrypt_doc_task, args, - callback=self.decrypt_doc_cb)) + callback=self._decrypt_doc_cb)) else: # decrypt inline res = decrypt_doc_task(*args) - self.decrypt_doc_cb(res) + self._decrypt_doc_cb(res) - def decrypt_doc_cb(self, result): + def _decrypt_doc_cb(self, result): """ Store the decryption result in the sync db from where it will later be - picked by process_decrypted. + picked by _process_decrypted. :param result: A tuple containing the doc id, revision and encrypted content. @@ -878,7 +868,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): doc_id, rev, content, gen, trans_id = result logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" % (doc_id, rev, gen, trans_id)) - self.insert_received_doc(doc_id, rev, content, gen, trans_id) + return self.insert_received_doc(doc_id, rev, content, gen, trans_id) def get_docs_by_generation(self, encrypted=None): """ @@ -899,6 +889,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): sql += " ORDER BY gen ASC" return self._fetchall(sql) + @defer.inlineCallbacks def get_insertable_docs_by_gen(self): """ Return a list of non-encrypted documents ready to be inserted. @@ -910,8 +901,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # docs, then some document might have been decrypted between these two # calls, and if it is just the right doc then it might not be caught # by the next loop. - all_docs = self.get_docs_by_generation() - decrypted_docs = self.get_docs_by_generation(encrypted=False) + all_docs = yield self.get_docs_by_generation() + decrypted_docs = yield self.get_docs_by_generation(encrypted=False) insertable = [] for doc_id, rev, _, gen, trans_id, encrypted in all_docs: for next_doc_id, _, next_content, _, _, _ in decrypted_docs: @@ -920,9 +911,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insertable.append((doc_id, rev, content, gen, trans_id)) else: break - return insertable + defer.returnValue(insertable) - def count_docs_in_sync_db(self, encrypted=None): + @defer.inlineCallbacks + def _count_docs_in_sync_db(self, encrypted=None): """ Count how many documents we have in the table for received docs. @@ -933,31 +925,30 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :return: The count of documents. :rtype: int """ - if self._sync_db is None: - logger.warning("cannot return count with null sync_db") - return - sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - res = self._fetchall(sql) + query += " WHERE encrypted = %d" % int(encrypted) + res = yield self._sync_db.runQuery(query) if res: val = res.pop() - return val[0] + defer.returnValue(val[0]) else: - return 0 + defer.returnValue(0) - def decrypt_received_docs(self): + @defer.inlineCallbacks + def _decrypt_received_docs(self): """ Get all the encrypted documents from the sync database and dispatch a decrypt worker to decrypt each one of them. """ - docs_by_generation = self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ \ - in filter(None, docs_by_generation): - self.decrypt_doc( + self._raise_in_case_of_failed_async_calls() + docs_by_generation = yield self.get_docs_by_generation(encrypted=True) + for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: + self._decrypt_doc( doc_id, rev, content, gen, trans_id, self.source_replica_uid) - def process_decrypted(self): + @defer.inlineCallbacks + def _process_decrypted(self): """ Process the already decrypted documents, and insert as many documents as can be taken from the expected order without finding a gap. @@ -968,12 +959,11 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # Acquire the lock to avoid processing while we're still # getting data from the syncing stream, to avoid InvalidGeneration # problems. - with self.write_encrypted_lock: - for doc_fields in self.get_insertable_docs_by_gen(): - self.insert_decrypted_local_doc(*doc_fields) - remaining = self.count_docs_in_sync_db() - return remaining == 0 + insertable = yield self.get_insertable_docs_by_gen() + for doc_fields in insertable: + yield self.insert_decrypted_local_doc(*doc_fields) + @defer.inlineCallbacks def insert_decrypted_local_doc(self, doc_id, doc_rev, content, gen, trans_id): """ @@ -1007,22 +997,22 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): insert_fun(doc, gen, trans_id) # If no errors found, remove it from the received database. - self.delete_received_doc(doc_id, doc_rev) + yield self.delete_received_doc(doc_id, doc_rev) + @defer.inlineCallbacks def empty(self): """ Empty the received docs table of the sync database. """ sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - self._sync_db.execute(sql) + yield self._sync_db.runQuery(sql) + @defer.inlineCallbacks def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() + results = yield self._sync_db.runQuery(*args, **kwargs) + defer.returnValue(results) - def raise_in_case_of_failed_async_calls(self): + def _raise_in_case_of_failed_async_calls(self): """ Re-raise any exception raised by an async call. @@ -1033,3 +1023,39 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if not res.successful(): # re-raise the exception raised by the remote call res.get() + + def _stop_decr_loop(self): + """ + """ + self._stopped.set() + + def close(self): + """ + """ + self._stop_decr_loop() + SyncEncryptDecryptPool.close(self) + + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from LoopingCall self._sync_loop. + """ + while not self._stopped.is_set(): + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + self._decrypt_received_docs() + self._process_decrypted() + time.sleep(self.DECRYPT_LOOP_PERIOD) + + def wait(self): + while not self.clear_to_sync(): + time.sleep(self.DECRYPT_LOOP_PERIOD) + + @defer.inlineCallbacks + def clear_to_sync(self): + count = yield self._count_docs_in_sync_db() + defer.returnValue(count == 0) |