From eae4468d99029006cc36a021e82350a0f62f7006 Mon Sep 17 00:00:00 2001 From: drebs Date: Thu, 7 May 2015 14:49:40 -0300 Subject: [bug] fix order of insertion of decrypted docs This commit actually does some different things: * When doing asynchronous decryption of incoming documents in soledad client during a sync, there was the possibility that a document corresponding to a newer generation would be decrypted and inserted in the local database before a document corresponding to an older generation. When this happened, the metadata about the target database (i.e. its locally-known generation) would be first updated to the newer generation, and then an attempt to insert a document corresponding to an older generation would cause the infamous InvalidGeneration error. To fix that we use the sync-index information that is contained in the sync stream to correctly find the insertable docs to be inserted in the local database, thus avoiding the problem described above. * Refactor the sync encrypt/decrypt pool to its own file. * Fix the use of twisted adbapi with multiprocessing. Closes: #6757. --- client/src/leap/soledad/client/crypto.py | 552 ---------------------- client/src/leap/soledad/client/encdecpool.py | 673 +++++++++++++++++++++++++++ client/src/leap/soledad/client/sqlcipher.py | 8 +- client/src/leap/soledad/client/target.py | 64 +-- 4 files changed, 700 insertions(+), 597 deletions(-) create mode 100644 client/src/leap/soledad/client/encdecpool.py (limited to 'client/src') diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index dd40b198..bdbaa8e0 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -23,21 +23,13 @@ import hmac import hashlib import json import logging -import multiprocessing -import threading -import time from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 -from zope.proxy import sameProxiedObjects - -from twisted.internet import defer -from twisted.internet.threads import deferToThread from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type from leap.soledad.common import crypto -from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -515,547 +507,3 @@ def is_symmetrically_encrypted(doc): == crypto.EncryptionSchemes.SYMKEY: return True return False - - -# -# Encrypt/decrypt pools of workers -# - -class SyncEncryptDecryptPool(object): - """ - Base class for encrypter/decrypter pools. - """ - WORKERS = multiprocessing.cpu_count() - - def __init__(self, crypto, sync_db): - """ - Initialize the pool of encryption-workers. - - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - - :param sync_db: A database connection handle - :type sync_db: pysqlcipher.dbapi2.Connection - - :param write_lock: a write lock for controlling concurrent access - to the sync_db - :type write_lock: threading.Lock - """ - self._pool = multiprocessing.Pool(self.WORKERS) - self._crypto = crypto - self._sync_db = sync_db - - def close(self): - """ - Cleanly close the pool of workers. - """ - logger.debug("Closing %s" % (self.__class__.__name__,)) - self._pool.close() - try: - self._pool.join() - except Exception: - pass - - def terminate(self): - """ - Terminate the pool of workers. - """ - logger.debug("Terminating %s" % (self.__class__.__name__,)) - self._pool.terminate() - - -def encrypt_doc_task(doc_id, doc_rev, content, key, secret): - """ - Encrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - encrypted_content = encrypt_docstr( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, encrypted_content - - -class SyncEncrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric encryption - of documents to be synced. - """ - # TODO implement throttling to reduce cpu usage?? - WORKERS = multiprocessing.cpu_count() - TABLE_NAME = "docs_tosync" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - - def encrypt_doc(self, doc, workers=True): - """ - Symmetrically encrypt a document. - - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - soledad_assert(self._crypto is not None, "need a crypto object") - docstr = doc.get_json() - key = self._crypto.doc_passphrase(doc.doc_id) - secret = self._crypto.secret - args = doc.doc_id, doc.rev, docstr, key, secret - - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline - res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) - - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): - """ - Insert results of encryption routine into the local sync database. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, doc_rev, content = result - return self.insert_encrypted_local_doc(doc_id, doc_rev, content) - - @defer.inlineCallbacks - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): - """ - Insert the contents of the encrypted doc into the local sync - database. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param content: The encrypted document. - :type content: str - """ - # FIXME --- callback should complete immediately since otherwise the - # thread which handles the results will get blocked - # Right now we're blocking the dispatcher with the writes to sqlite. - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ - % (self.TABLE_NAME,) - yield self._sync_db.runQuery(query, (doc_id, doc_rev, content)) - - -def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): - """ - Decrypt the content of the given document. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The encrypted content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification of - that document. - :type trans_id: str - :param key: The encryption key. - :type key: str - :param secret: The Soledad storage secret (used for MAC auth). - :type secret: str - - :return: A tuple containing the doc id, revision and encrypted content. - :rtype: tuple(str, str, str) - """ - decrypted_content = decrypt_doc_dict( - content, doc_id, doc_rev, key, secret) - return doc_id, doc_rev, decrypted_content, gen, trans_id - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. All the encrypted docs are collected, together with their generation - and transaction-id - 2. The docs are enqueued for decryption. When completed, they are - inserted following the generation order. - """ - # TODO implement throttling to reduce cpu usage?? - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._async_results = [] - - self._stopped = threading.Event() - self._deferred_loop = deferToThread(self._decrypt_and_process_docs) - self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decryptor thread.")) - - @defer.inlineCallbacks - def insert_encrypted_received_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert a received message with encrypted content, to be decrypted later - on. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - docstr = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery( - query, - (doc_id, doc_rev, docstr, gen, trans_id, 1)) - - @defer.inlineCallbacks - def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The Document ID. - :type doc_id: str - :param doc_rev: The Document Revision - :param doc_rev: str - :param content: the Content of the document - :type content: str - :param gen: the Document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery( - query, - (doc_id, doc_rev, content, gen, trans_id, 0)) - - @defer.inlineCallbacks - def delete_received_doc(self, doc_id, doc_rev): - """ - Delete a received doc after it was inserted into the local db. - - :param doc_id: Document ID. - :type doc_id: str - :param doc_rev: Document revision. - :type doc_rev: str - """ - sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( - self.TABLE_NAME,) - yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev)) - - def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, - source_replica_uid, workers=True): - """ - Symmetrically decrypt a document. - - :param doc_id: The ID for the document with contents to be encrypted. - :type doc: str - :param rev: The revision of the document. - :type rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - :param source_replica_uid: - :type source_replica_uid: str - - :param workers: Whether to defer the decryption to the multiprocess - pool of workers. Useful for debugging purposes. - :type workers: bool - """ - self.source_replica_uid = source_replica_uid - - # insert_doc_cb is a proxy object that gets updated with the right - # insert function only when the sync_target invokes the sync_exchange - # method. so, if we don't still have a non-empty callback, we refuse - # to proceed. - if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), - None): - logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") - return - - soledad_assert(self._crypto is not None, "need a crypto object") - - if len(content) == 0: - # not encrypted payload - return - - content = json.loads(content) - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, rev, content, gen, trans_id, key, secret - - if workers: - # save the async result object so we can inspect it for failures - self._async_results.append(self._pool.apply_async( - decrypt_doc_task, args, - callback=self._decrypt_doc_cb)) - else: - # decrypt inline - res = decrypt_doc_task(*args) - self._decrypt_doc_cb(res) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted. - - :param result: A tuple containing the doc id, revision and encrypted - content. - :type result: tuple(str, str, str) - """ - doc_id, rev, content, gen, trans_id = result - logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc(doc_id, rev, content, gen, trans_id) - - def get_docs_by_generation(self, encrypted=None): - """ - Get all documents in the received table from the sync db, - ordered by generation. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - - :return: list of doc_id, rev, generation, gen, trans_id - :rtype: list - """ - sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \ - % self.TABLE_NAME - if encrypted is not None: - sql += " WHERE encrypted = %d" % int(encrypted) - sql += " ORDER BY gen ASC" - return self._fetchall(sql) - - @defer.inlineCallbacks - def get_insertable_docs_by_gen(self): - """ - Return a list of non-encrypted documents ready to be inserted. - """ - # here, we compare the list of all available docs with the list of - # decrypted docs and find the longest common prefix between these two - # lists. Note that the order of lists fetch matters: if instead we - # first fetch the list of decrypted docs and then the list of all - # docs, then some document might have been decrypted between these two - # calls, and if it is just the right doc then it might not be caught - # by the next loop. - all_docs = yield self.get_docs_by_generation() - decrypted_docs = yield self.get_docs_by_generation(encrypted=False) - insertable = [] - for doc_id, rev, _, gen, trans_id, encrypted in all_docs: - for next_doc_id, _, next_content, _, _, _ in decrypted_docs: - if doc_id == next_doc_id: - content = next_content - insertable.append((doc_id, rev, content, gen, trans_id)) - else: - break - defer.returnValue(insertable) - - @defer.inlineCallbacks - def _count_docs_in_sync_db(self, encrypted=None): - """ - Count how many documents we have in the table for received docs. - - :param encrypted: If not None, return count of documents with - encrypted field equal to given parameter. - :type encrypted: bool or None - - :return: The count of documents. - :rtype: int - """ - query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) - if encrypted is not None: - query += " WHERE encrypted = %d" % int(encrypted) - res = yield self._sync_db.runQuery(query) - if res: - val = res.pop() - defer.returnValue(val[0]) - else: - defer.returnValue(0) - - @defer.inlineCallbacks - def _decrypt_received_docs(self): - """ - Get all the encrypted documents from the sync database and dispatch a - decrypt worker to decrypt each one of them. - """ - self._raise_in_case_of_failed_async_calls() - docs_by_generation = yield self.get_docs_by_generation(encrypted=True) - for doc_id, rev, content, gen, trans_id, _ in docs_by_generation: - self._decrypt_doc( - doc_id, rev, content, gen, trans_id, self.source_replica_uid) - - @defer.inlineCallbacks - def _process_decrypted(self): - """ - Process the already decrypted documents, and insert as many documents - as can be taken from the expected order without finding a gap. - - :return: Whether we have processed all the pending docs. - :rtype: bool - """ - # Acquire the lock to avoid processing while we're still - # getting data from the syncing stream, to avoid InvalidGeneration - # problems. - insertable = yield self.get_insertable_docs_by_gen() - for doc_fields in insertable: - yield self.insert_decrypted_local_doc(*doc_fields) - - @defer.inlineCallbacks - def insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id): - """ - Insert the decrypted document into the local sqlcipher database. - Makes use of the passed callback `return_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - insert_fun = self._insert_doc_cb[self.source_replica_uid] - logger.debug("Sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - insert_fun(doc, gen, trans_id) - - # If no errors found, remove it from the received database. - yield self.delete_received_doc(doc_id, doc_rev) - - @defer.inlineCallbacks - def empty(self): - """ - Empty the received docs table of the sync database. - """ - sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - yield self._sync_db.runQuery(sql) - - @defer.inlineCallbacks - def _fetchall(self, *args, **kwargs): - results = yield self._sync_db.runQuery(*args, **kwargs) - defer.returnValue(results) - - def _raise_in_case_of_failed_async_calls(self): - """ - Re-raise any exception raised by an async call. - - :raise Exception: Raised if an async call has raised an exception. - """ - for res in self._async_results: - if res.ready(): - if not res.successful(): - # re-raise the exception raised by the remote call - res.get() - - def _stop_decr_loop(self): - """ - """ - self._stopped.set() - - def close(self): - """ - """ - self._stop_decr_loop() - SyncEncryptDecryptPool.close(self) - - def _decrypt_and_process_docs(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - Called periodically from LoopingCall self._sync_loop. - """ - while not self._stopped.is_set(): - if sameProxiedObjects( - self._insert_doc_cb.get(self.source_replica_uid), - None): - continue - self._decrypt_received_docs() - self._process_decrypted() - time.sleep(self.DECRYPT_LOOP_PERIOD) - - def wait(self): - while not self.clear_to_sync(): - time.sleep(self.DECRYPT_LOOP_PERIOD) - - @defer.inlineCallbacks - def clear_to_sync(self): - count = yield self._count_docs_in_sync_db() - defer.returnValue(count == 0) diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py new file mode 100644 index 00000000..0466ec5d --- /dev/null +++ b/client/src/leap/soledad/client/encdecpool.py @@ -0,0 +1,673 @@ +# -*- coding: utf-8 -*- +# encdecpool.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +A pool of encryption/decryption concurrent and parallel workers for using +during synchronization. +""" + + +import multiprocessing +import threading +import time +import json +import logging + +from zope.proxy import sameProxiedObjects + +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.soledad.common.document import SoledadDocument +from leap.soledad.common import soledad_assert + +from leap.soledad.client.crypto import encrypt_docstr +from leap.soledad.client.crypto import decrypt_doc_dict + + +logger = logging.getLogger(__name__) + + +# +# Encrypt/decrypt pools of workers +# + +class SyncEncryptDecryptPool(object): + """ + Base class for encrypter/decrypter pools. + """ + WORKERS = multiprocessing.cpu_count() + + def __init__(self, crypto, sync_db): + """ + Initialize the pool of encryption-workers. + + :param crypto: A SoledadCryto instance to perform the encryption. + :type crypto: leap.soledad.crypto.SoledadCrypto + + :param sync_db: A database connection handle + :type sync_db: pysqlcipher.dbapi2.Connection + """ + self._pool = multiprocessing.Pool(self.WORKERS) + self._crypto = crypto + self._sync_db = sync_db + + def close(self): + """ + Cleanly close the pool of workers. + """ + logger.debug("Closing %s" % (self.__class__.__name__,)) + self._pool.close() + try: + self._pool.join() + except Exception: + pass + + def terminate(self): + """ + Terminate the pool of workers. + """ + logger.debug("Terminating %s" % (self.__class__.__name__,)) + self._pool.terminate() + + +def encrypt_doc_task(doc_id, doc_rev, content, key, secret): + """ + Encrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + encrypted_content = encrypt_docstr( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, encrypted_content + + +class SyncEncrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric encryption + of documents to be synced. + """ + # TODO implement throttling to reduce cpu usage?? + WORKERS = multiprocessing.cpu_count() + TABLE_NAME = "docs_tosync" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" + + def encrypt_doc(self, doc, workers=True): + """ + Symmetrically encrypt a document. + + :param doc: The document with contents to be encrypted. + :type doc: SoledadDocument + + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + """ + soledad_assert(self._crypto is not None, "need a crypto object") + docstr = doc.get_json() + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + args = doc.doc_id, doc.rev, docstr, key, secret + + try: + if workers: + res = self._pool.apply_async( + encrypt_doc_task, args, + callback=self.encrypt_doc_cb) + else: + # encrypt inline + res = encrypt_doc_task(*args) + self.encrypt_doc_cb(res) + + except Exception as exc: + logger.exception(exc) + + def encrypt_doc_cb(self, result): + """ + Insert results of encryption routine into the local sync database. + + :param result: A tuple containing the doc id, revision and encrypted + content. + :type result: tuple(str, str, str) + """ + doc_id, doc_rev, content = result + return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + """ + Insert the contents of the encrypted doc into the local sync + database. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + """ + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ + % (self.TABLE_NAME,) + return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + + +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, + idx): + """ + Decrypt the content of the given document. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The encrypted content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification of + that document. + :type trans_id: str + :param key: The encryption key. + :type key: str + :param secret: The Soledad storage secret (used for MAC auth). + :type secret: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A tuple containing the doc id, revision and encrypted content. + :rtype: tuple(str, str, str) + """ + decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id, idx + + +class SyncDecrypterPool(SyncEncryptDecryptPool): + """ + Pool of workers that spawn subprocesses to execute the symmetric decryption + of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. Encrypted documents are stored in the sync db by the actual soledad + sync loop. + 2. The soledad sync loop tells us how many documents we should expect + to process. + 3. We start a decrypt-and-process loop: + + a. Encrypted documents are fetched. + b. Encrypted documents are decrypted. + c. The longest possible list of decrypted documents are inserted + in the soledad db (this depends on which documents have already + arrived and which documents have already been decrypte, because + the order of insertion in the local soledad db matters). + d. Processed documents are deleted from the database. + + 4. When we have processed as many documents as we should, the loop + finishes. + """ + # TODO implement throttling to reduce cpu usage?? + TABLE_NAME = "docs_received" + FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ + "trans_id, encrypted, idx" + + """ + Period of recurrence of the periodic decrypting task, in seconds. + """ + DECRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + :param source_replica_uid: The source replica uid, used to find the + correct callback for inserting documents. + :type source_replica_uid: str + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + self.source_replica_uid = kwargs.pop("source_replica_uid") + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._last_inserted_idx = 0 + self._docs_to_process = None + self._processed_docs = 0 + + self._async_results = [] + self._exception = None + self._finished = threading.Event() + + # clear the database before starting the sync + self._empty_db = threading.Event() + d = self._empty() + d.addCallback(lambda _: self._empty_db.set()) + + # start the decryption loop + self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished decryptor thread.")) + + def set_docs_to_process(self, docs_to_process): + """ + Set the number of documents we expect to process. + + This should be called by the during the sync exchange process as soon + as we know how many documents are arriving from the server. + + :param docs_to_process: The number of documents to process. + :type docs_to_process: int + """ + self._docs_to_process = docs_to_process + + def insert_encrypted_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a received message with encrypted content, to be decrypted later + on. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + docstr = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._sync_db.runOperation( + query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) + + def insert_received_doc( + self, doc_id, doc_rev, content, gen, trans_id, idx): + """ + Insert a document that is not symmetrically encrypted. + We store it in the staging area (the decrypted_docs dictionary) to be + picked up in order as the preceding documents are decrypted. + + :param doc_id: The Document ID. + :type doc_id: str + :param doc_rev: The Document Revision + :param doc_rev: str + :param content: the Content of the document + :type content: str + :param gen: the Document Generation + :type gen: int + :param trans_id: Transaction ID + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + if not isinstance(content, str): + content = json.dumps(content) + query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ + % self.TABLE_NAME + return self._sync_db.runOperation( + query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) + + def _delete_received_doc(self, doc_id): + """ + Delete a received doc after it was inserted into the local db. + + :param doc_id: Document ID. + :type doc_id: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM '%s' WHERE doc_id=?" \ + % self.TABLE_NAME + return self._sync_db.runOperation(query, (doc_id,)) + + def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, + workers=True): + """ + Symmetrically decrypt a document and store in the sync db. + + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + :param idx: The index of this document in the current sync process. + :type idx: int + :param workers: Whether to defer the decryption to the multiprocess + pool of workers. Useful for debugging purposes. + :type workers: bool + + :return: A deferred that will fire after the document hasa been + decrypted and inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + # insert_doc_cb is a proxy object that gets updated with the right + # insert function only when the sync_target invokes the sync_exchange + # method. so, if we don't still have a non-empty callback, we refuse + # to proceed. + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + logger.debug("Sync decrypter pool: no insert_doc_cb() yet.") + return + + soledad_assert(self._crypto is not None, "need a crypto object") + + content = json.loads(content) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + args = doc_id, rev, content, gen, trans_id, key, secret, idx + + if workers: + # when using multiprocessing, we need to wait for all parallel + # processing to finish before continuing with the + # decrypt-and-process loop. We do this by using an extra deferred + # that will be fired by the multiprocessing callback when it has + # finished processing. + d1 = defer.Deferred() + + def _multiprocessing_callback(result): + d2 = self._decrypt_doc_cb(result) + d2.addCallback(lambda defres: d1.callback(defres)) + + # save the async result object so we can inspect it for failures + self._async_results.append( + self._pool.apply_async( + decrypt_doc_task, args, + callback=_multiprocessing_callback)) + + return d1 + else: + # decrypt inline + res = decrypt_doc_task(*args) + return self._decrypt_doc_cb(res) + + def _decrypt_doc_cb(self, result): + """ + Store the decryption result in the sync db from where it will later be + picked by _process_decrypted. + + :param result: A tuple containing the document's id, revision, + content, generation, transaction id and sync index. + :type result: tuple(str, str, str, int, str, int) + + :return: A deferred that will fire after the document has been + inserted in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + doc_id, rev, content, gen, trans_id, idx = result + logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s" + % (doc_id, rev, gen, trans_id)) + return self.insert_received_doc( + doc_id, rev, content, gen, trans_id, idx) + + def _get_docs(self, encrypted=None, order_by='idx', order='ASC'): + """ + Get documents from the received docs table in the sync db. + + :param encrypted: If not None, only return documents with encrypted + field equal to given parameter. + :type encrypted: bool or None + :param order_by: The name of the field to order results. + :type order_by: str + :param order: Whether the order should be ASC or DESC. + :type order: str + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ + "idx FROM %s" % self.TABLE_NAME + if encrypted is not None: + query += " WHERE encrypted = %d" % int(encrypted) + query += " ORDER BY %s %s" % (order_by, order) + return self._sync_db.runQuery(query) + + @defer.inlineCallbacks + def _get_insertable_docs(self): + """ + Return a list of non-encrypted documents ready to be inserted. + + :return: A deferred that will fire with the list of insertable + documents. + :rtype: twisted.internet.defer.Deferred + """ + # here, we fetch the list of decrypted documents and compare with the + # index of the last succesfully processed document. + decrypted_docs = yield self._get_docs(encrypted=False) + insertable = [] + last_idx = self._last_inserted_idx + for doc_id, rev, content, gen, trans_id, encrypted, idx in \ + decrypted_docs: + # XXX for some reason, a document might not have been deleted from + # the database. This is a bug. In this point, already + # processed documents should have been removed from the sync + # database and we should not have to skip them here. We need + # to find out why this is happening, fix, and remove the + # skipping below. + if (idx < last_idx + 1): + continue + if (idx != last_idx + 1): + break + insertable.append((doc_id, rev, content, gen, trans_id, idx)) + last_idx += 1 + defer.returnValue(insertable) + + def _decrypt_received_docs(self): + """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + + :return: A deferred that will fire after all documents have been + decrypted and inserted back in the sync db. + :rtype: twisted.internet.defer.Deferred + """ + + def _callback(received_docs): + deferreds = [] + for doc_id, rev, content, gen, trans_id, _, idx in received_docs: + deferreds.append( + self._decrypt_doc( + doc_id, rev, content, gen, trans_id, idx)) + return defer.gatherResults(deferreds) + + d = self._get_docs(encrypted=True) + d.addCallback(_callback) + return d + + def _process_decrypted(self): + """ + Fetch as many decrypted documents as can be taken from the expected + order and insert them in the database. + + :return: A deferred that will fire with the list of inserted + documents. + :rtype: twisted.internet.defer.Deferred + """ + + def _callback(insertable): + for doc_fields in insertable: + self._insert_decrypted_local_doc(*doc_fields) + return insertable + + d = self._get_insertable_docs() + d.addCallback(_callback) + return d + + def _delete_processed_docs(self, inserted): + """ + Delete from the sync db documents that have been processed. + + :param inserted: List of documents inserted in the previous process + step. + :type inserted: list + + :return: A list of deferreds that will fire when each operation in the + database has finished. + :rtype: twisted.internet.defer.DeferredList + """ + deferreds = [] + for doc_id, doc_rev, _, _, _, _ in inserted: + deferreds.append( + self._delete_received_doc(doc_id)) + if not deferreds: + return defer.succeed(None) + return defer.gatherResults(deferreds) + + def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id, idx): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + + :param doc_id: The document id. + :type doc_id: str + :param doc_rev: The document revision. + :type doc_rev: str + :param content: The serialized content of the document. + :type content: str + :param gen: The generation corresponding to the modification of that + document. + :type gen: int + :param trans_id: The transaction id corresponding to the modification + of that document. + :type trans_id: str + """ + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + logger.debug("Sync decrypter pool: inserting doc in local db: " + "%s:%s %s" % (doc_id, doc_rev, gen)) + + # convert deleted documents to avoid error on document creation + if content == 'null': + content = None + doc = SoledadDocument(doc_id, doc_rev, content) + gen = int(gen) + insert_fun(doc, gen, trans_id) + + # store info about processed docs + self._last_inserted_idx = idx + self._processed_docs += 1 + + def _empty(self): + """ + Empty the received docs table of the sync database. + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) + return self._sync_db.runOperation(query) + + def _raise_if_async_fails(self): + """ + Raise any exception raised by a multiprocessing async decryption + call. + + :raise Exception: Raised if an async call has raised an exception. + """ + for res in self._async_results: + if res.ready(): + if not res.successful(): + # re-raise the exception raised by the remote call + res.get() + + def _decrypt_and_process_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + This method runs in its own thread, so sleeping will not interfere + with the main thread. + """ + try: + # wait for database to be emptied + self._empty_db.wait() + # wait until we know how many documents we need to process + while self._docs_to_process is None: + time.sleep(self.DECRYPT_LOOP_PERIOD) + # because all database operations are asynchronous, we use an event to + # make sure we don't start the next loop before the current one has + # finished. + event = threading.Event() + # loop until we have processes as many docs as the number of changes + while self._processed_docs < self._docs_to_process: + if sameProxiedObjects( + self._insert_doc_cb.get(self.source_replica_uid), + None): + continue + event.clear() + d = self._decrypt_received_docs() + d.addCallback(lambda _: self._raise_if_async_fails()) + d.addCallback(lambda _: self._process_decrypted()) + d.addCallback(self._delete_processed_docs) + d.addCallback(lambda _: event.set()) + event.wait() + # sleep a bit to give time for some decryption work + time.sleep(self.DECRYPT_LOOP_PERIOD) + except Exception as e: + self._exception = e + self._finished.set() + + def wait(self): + """ + Wait for the decrypt-and-process loop to finish. + """ + self._finished.wait() + if self._exception: + raise self._exception diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 4f7ecd1b..d3b3d01b 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -66,7 +66,7 @@ from twisted.python.threadpool import ThreadPool from twisted.python import log from twisted.enterprise import adbapi -from leap.soledad.client import crypto +from leap.soledad.client import encdecpool from leap.soledad.client.target import SoledadSyncTarget from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.client.sync import SoledadSynchronizer @@ -489,7 +489,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): if defer_encryption: # initialize syncing queue encryption pool - self._sync_enc_pool = crypto.SyncEncrypterPool( + self._sync_enc_pool = encdecpool.SyncEncrypterPool( self._crypto, self._sync_db) # ----------------------------------------------------------------- @@ -578,8 +578,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :rtype: tuple of strings """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" - encr = crypto.SyncEncrypterPool - decr = crypto.SyncDecrypterPool + encr = encdecpool.SyncEncrypterPool + decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) sql_decr_table_query = (maybe_create % ( diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 06cef1ee..17ce718f 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool +from leap.soledad.client.encdecpool import SyncEncrypterPool +from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import signal @@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._crypto = crypto self._stopped = True self._stop_lock = threading.Lock() - self._sync_exchange_lock = threading.Lock() self.source_replica_uid = source_replica_uid - self._defer_decryption = False self._syncer_pool = None # deferred decryption attributes @@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Tear down the SyncDecrypterPool. """ - if self._sync_decr_pool is not None: - self._sync_decr_pool.close() - self._sync_decr_pool = None + self._sync_decr_pool.close() + self._sync_decr_pool = None def _get_replica_uid(self, url): """ @@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): doc = SoledadDocument(doc_id, rev, content) if is_symmetrically_encrypted(doc): if self._queue_for_decrypt: - self._save_encrypted_received_doc( + self._enqueue_encrypted_received_doc( doc, gen, trans_id, idx, total) else: # defer_decryption is False or no-sync-db fallback @@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # not symmetrically encrypted doc, insert it directly # or save it in the decrypted stage. if self._queue_for_decrypt: - self._save_received_doc(doc, gen, trans_id, idx, total) + self._enqueue_received_doc(doc, gen, trans_id, idx, total) else: self._return_doc_cb(doc, gen, trans_id) # ------------------------------------------------------------- @@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.stop(fail=True) break + if defer_decryption: + self._setup_sync_decr_pool() + t.doc_syncer.set_request_method( 'get', idx, sync_id, last_known_generation, last_known_trans_id) @@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result + if defer_decryption and number_of_changes: + self._sync_decr_pool.set_docs_to_process( + number_of_changes) else: raise t.exception first_request = False @@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): new_generation = doc_data['gen'] new_transaction_id = doc_data['trans_id'] + # decrypt docs in case of deferred decryption + if defer_decryption: + self._sync_decr_pool.wait() + self._teardown_sync_decr_pool() + return new_generation, new_transaction_id def sync_exchange(self, docs_by_generations, @@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ self._ensure_callback = ensure_callback - if defer_decryption and self._sync_db is not None: - self._sync_exchange_lock.acquire() - self._setup_sync_decr_pool() - self._defer_decryption = True - else: - # fall back - defer_decryption = False - self.start() if sync_id is None: @@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): setProxiedObject(self._insert_doc_cb[source_replica_uid], return_doc_cb) - # empty the database before starting a new sync - if defer_decryption is True and not self.clear_to_sync(): - self._sync_decr_pool.empty() - self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): trans_id_after_send = response_dict['new_transaction_id'] # get docs from target + if self._sync_db is None: + defer_decryption = False if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, @@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._syncer_pool.cleanup() - # decrypt docs in case of deferred decryption - if defer_decryption: - self._sync_decr_pool.wait() - self._teardown_sync_decr_pool() - self._sync_exchange_lock.release() - # update gen and trans id info in case we just sent and did not # receive docs. if gen_after_send is not None and gen_after_send > cur_target_gen: @@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): encr.TABLE_NAME,)) self._sync_db.execute(sql, (doc_id, doc_rev)) - def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ Save a symmetrically encrypted incoming document into the received docs table in the sync db. A decryption task will pick it up @@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc for decryption: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - def _save_received_doc(self, doc, gen, trans_id, idx, total): + def _enqueue_received_doc(self, doc, gen, trans_id, idx, total): """ Save any incoming document into the received docs table in the sync db. @@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): "Enqueueing doc, no decryption needed: %d/%d." % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id) + doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) # # Symmetric decryption of syncing docs # - def clear_to_sync(self): - """ - Return whether sync can proceed (ie, the received db table is empty). - - :return: Whether sync can proceed. - :rtype: bool - """ - if self._sync_decr_pool: - return self._sync_decr_pool.clear_to_sync() - return True - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. -- cgit v1.2.3