diff options
-rw-r--r-- | client/src/leap/soledad/client/api.py | 31 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 380 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 12 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 66 | ||||
-rw-r--r-- | client/src/leap/soledad/client/interfaces.py | 7 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 11 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 18 | ||||
-rw-r--r-- | testing/tests/perf/test_encdecpool.py | 41 | ||||
-rw-r--r-- | testing/tests/perf/test_sync.py | 3 | ||||
-rw-r--r-- | testing/tests/sync/test_encdecpool.py | 258 | ||||
-rw-r--r-- | testing/tests/sync/test_sync.py | 2 | ||||
-rw-r--r-- | testing/tests/sync/test_sync_deferred.py | 15 | ||||
-rw-r--r-- | testing/tests/sync/test_sync_mutex.py | 4 | ||||
-rw-r--r-- | testing/tests/sync/test_sync_target.py | 27 |
14 files changed, 36 insertions, 839 deletions
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..cbcae4f7 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -169,7 +169,7 @@ class Soledad(object): :type auth_token: str :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it + Whether to defer encryption of documents, or do it inline while syncing. :type defer_encryption: bool @@ -299,9 +299,9 @@ class Soledad(object): ) self._sqlcipher_opts = opts - # the sync_db is used both for deferred encryption and decryption, so + # the sync_db is used both for deferred encryption, so # we want to initialize it anyway to allow for all combinations of - # deferred encryption and decryption configurations. + # deferred encryption configurations. self._initialize_sync_db(opts) self._dbpool = adbapi.getConnectionPool( opts, sync_enc_pool=self._sync_enc_pool) @@ -700,37 +700,26 @@ class Soledad(object): if syncable and not self._dbsyncer: self._init_u1db_syncer() - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents with the server replica. This method uses a lock to prevent multiple concurrent sync processes over the same local db file. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred lock that will run the actual sync process when the lock is acquired, and which will fire with with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ d = self.sync_lock.run( - self._sync, - defer_decryption) + self._sync) return d - def _sync(self, defer_decryption): + def _sync(self): """ Synchronize documents with the server replica. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -740,8 +729,7 @@ class Soledad(object): return d = self._dbsyncer.sync( sync_url, - creds=self._creds, - defer_decryption=defer_decryption) + creds=self._creds) def _sync_callback(local_gen): self._last_received_docs = docs = self._dbsyncer.received_docs @@ -874,12 +862,9 @@ class Soledad(object): """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" encr = encdecpool.SyncEncrypterPool - decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr_table_query = (maybe_create % ( - decr.TABLE_NAME, decr.FIELD_NAMES)) - return (sql_encr_table_query, sql_decr_table_query) + return (sql_encr_table_query,) # # ISecretsStorage diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 056b012f..8eaefa77 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,14 +22,9 @@ during synchronization. """ -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall from twisted.internet import threads from twisted.internet import defer -from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert from leap.soledad.common.log import getLogger @@ -41,7 +36,7 @@ logger = getLogger(__name__) # -# Encrypt/decrypt pools of workers +# Encrypt pool of workers # class SyncEncryptDecryptPool(object): @@ -282,376 +277,3 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, """ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. Encrypted documents are stored in the sync db by the actual soledad - sync loop. - 2. The soledad sync loop tells us how many documents we should expect - to process. - 3. We start a decrypt-and-process loop: - - a. Encrypted documents are fetched. - b. Encrypted documents are decrypted. - c. The longest possible list of decrypted documents are inserted - in the soledad db (this depends on which documents have already - arrived and which documents have already been decrypte, because - the order of insertion in the local soledad db matters). - d. Processed documents are deleted from the database. - - 4. When we have processed as many documents as we should, the loop - finishes. - """ - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx, sync_id" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param source_replica_uid: The source replica uid, used to find the - correct callback for inserting documents. - :type source_replica_uid: str - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - - self._docs_to_process = None - self._processed_docs = 0 - self._last_inserted_idx = 0 - - self._loop = LoopingCall(self._decrypt_and_recurse) - - def _start_pool(self, period): - self._loop.start(period) - - def start(self, docs_to_process): - """ - Set the number of documents we expect to process. - - This should be called by the during the sync exchange process as soon - as we know how many documents are arriving from the server. - - :param docs_to_process: The number of documents to process. - :type docs_to_process: int - """ - SyncEncryptDecryptPool.start(self) - self._decrypted_docs_indexes = set() - self._sync_id = uuid4().hex - self._docs_to_process = docs_to_process - self._deferred = defer.Deferred() - d = self._init_db() - d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) - return d - - def stop(self): - if self._loop.running: - self._loop.stop() - self._finish() - SyncEncryptDecryptPool.stop(self) - - def _init_db(self): - """ - Ensure sync_id column is present then - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % - self.TABLE_NAME) - d = self._runQuery(ensure_sync_id_column) - - def empty_received_docs(_): - query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) - return self._runOperation(query, (self._sync_id,)) - - d.addCallbacks(empty_received_docs, empty_received_docs) - return d - - def _errback(self, failure): - logger.error(failure) - self._deferred.errback(failure) - self._processed_docs = 0 - self._last_inserted_idx = 0 - - @property - def deferred(self): - """ - Deferred that will be fired when the decryption loop has finished - processing all the documents. - """ - return self._deferred - - def insert_encrypted_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Decrypt and insert a received document into local staging area to be - processed later on. - - :param doc_id: The document ID. - :type doc_id: str - :param doc_rev: The document Revision - :param doc_rev: str - :param content: The content of the document - :type content: dict - :param gen: The document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire after the decrypted document has - been inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - decrypt_doc_task, *args) - # callback will insert it for later processing - d.addCallback(self._decrypt_doc_cb) - return d - - def insert_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The document id - :type doc_id: str - :param doc_rev: The document revision - :param doc_rev: str or dict - :param content: The content of the document - :type content: dict - :param gen: The document generation - :type gen: int - :param trans_id: The transaction id - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, - idx, self._sync_id)) - d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) - return d - - def _delete_received_docs(self, doc_ids): - """ - Delete a list of received docs after get them inserted into the db. - - :param doc_id: Document ID list. - :type doc_id: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - placeholders = ', '.join('?' for _ in doc_ids) - query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ - % (self.TABLE_NAME, placeholders) - return self._runOperation(query, (doc_ids)) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted_docs. - - :param result: A tuple containing the document's id, revision, - content, generation, transaction id and sync index. - :type result: tuple(str, str, str, int, str, int) - - :return: A deferred that will fire after the document has been - inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - doc_id, rev, content, gen, trans_id, idx = result - logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _get_docs(self, encrypted=None, sequence=None): - """ - Get documents from the received docs table in the sync db. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - :param order_by: The name of the field to order results. - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ - "idx FROM %s" % self.TABLE_NAME - parameters = [] - if encrypted or sequence: - query += " WHERE sync_id = ? and" - parameters += [self._sync_id] - if encrypted: - query += " encrypted = ?" - parameters += [int(encrypted)] - if sequence: - query += " idx in (" + ', '.join('?' * len(sequence)) + ")" - parameters += [int(i) for i in sequence] - query += " ORDER BY idx ASC" - return self._runQuery(query, parameters) - - @defer.inlineCallbacks - def _get_insertable_docs(self): - """ - Return a list of non-encrypted documents ready to be inserted. - - :return: A deferred that will fire with the list of insertable - documents. - :rtype: twisted.internet.defer.Deferred - """ - # Here, check in memory what are the insertable indexes that can - # form a sequence starting from the last inserted index - sequence = [] - insertable_docs = [] - next_index = self._last_inserted_idx + 1 - while next_index in self._decrypted_docs_indexes: - sequence.append(str(next_index)) - next_index += 1 - if len(sequence) > 900: - # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER - # if we try to query more, SQLite will refuse - # we need to find a way to improve this - # being researched in #7669 - break - # Then fetch all the ones ready for insertion. - if sequence: - insertable_docs = yield self._get_docs(encrypted=False, - sequence=sequence) - defer.returnValue(insertable_docs) - - @defer.inlineCallbacks - def _process_decrypted_docs(self): - """ - Fetch as many decrypted documents as can be taken from the expected - order and insert them in the local replica. - - :return: A deferred that will fire with the list of inserted - documents. - :rtype: twisted.internet.defer.Deferred - """ - insertable = yield self._get_insertable_docs() - processed_docs_ids = [] - for doc_fields in insertable: - method = self._insert_decrypted_local_doc - # FIXME: This is used only because SQLCipherU1DBSync is synchronous - # When adbapi is used there is no need for an external thread - # Without this the reactor can freeze and fail docs download - yield threads.deferToThread(method, *doc_fields) - processed_docs_ids.append(doc_fields[0]) - yield self._delete_received_docs(processed_docs_ids) - - def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, encrypted, idx): - """ - Insert the decrypted document into the local replica. - - Make use of the passed callback `insert_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - logger.debug("sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - self._insert_doc_cb(doc, gen, trans_id) - - # store info about processed docs - self._last_inserted_idx = idx - self._processed_docs += 1 - - @defer.inlineCallbacks - def _decrypt_and_recurse(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - This method implicitelly returns a defferred (see the decorator - above). It should only be called by _launch_decrypt_and_process(). - because this way any exceptions raised here will be stored by the - errback attached to the deferred returned. - - :return: A deferred which will fire after all decrypt, process and - delete operations have been executed. - :rtype: twisted.internet.defer.Deferred - """ - if not self.running: - defer.returnValue(None) - processed = self._processed_docs - pending = self._docs_to_process - - if processed < pending: - yield self._process_decrypted_docs() - else: - self._finish() - - def _finish(self): - self._processed_docs = 0 - self._last_inserted_idx = 0 - self._decrypted_docs_indexes = set() - if not self._deferred.called: - self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..c9da939c 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -43,8 +43,6 @@ class SyncTargetAPI(SyncTarget): def close(self): if self._sync_enc_pool: self._sync_enc_pool.stop() - if self._sync_decr_pool: - self._sync_decr_pool.stop() yield self._http.close() @property @@ -153,7 +151,7 @@ class SyncTargetAPI(SyncTarget): def sync_exchange(self, docs_by_generation, source_replica_uid, last_known_generation, last_known_trans_id, insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. After that, receive documents from the remote @@ -185,11 +183,6 @@ class SyncTargetAPI(SyncTarget): created. :type ensure_callback: function - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which fires with the new generation and transaction id of the target replica. :rtype: twisted.internet.defer.Deferred @@ -221,8 +214,7 @@ class SyncTargetAPI(SyncTarget): cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) + ensure_callback, sync_id) # update gen and trans id info in case we just sent and did not # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 26606e9b..1f1bc480 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -21,7 +21,6 @@ from twisted.internet import defer from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger from leap.soledad.common.document import SoledadDocument @@ -50,26 +49,11 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - defer_decryption = False - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None + ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - # we fetch the first document before fetching the rest because we need # to know the total number of documents to be received, and this # information comes as metadata to each request. @@ -85,14 +69,6 @@ class HTTPDocFetcher(object): new_generation = ngen new_transaction_id = ntrans - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - defer.returnValue([new_generation, new_transaction_id]) def _fetch_all(self, last_known_generation, @@ -126,9 +102,6 @@ class HTTPDocFetcher(object): new_generation, new_transaction_id, number_of_changes, entries =\ self._parse_received_doc_response(response) - if self._sync_decr_pool and not self._sync_decr_pool.running: - self._sync_decr_pool.start(number_of_changes) - for doc_id, rev, content, gen, trans_id in entries: if doc_id is not None: # decrypt incoming document and insert into local database @@ -136,31 +109,10 @@ class HTTPDocFetcher(object): # symmetric decryption of document's contents # --------------------------------------------------------- # If arriving content was symmetrically encrypted, we decrypt - # it. We do it inline if defer_decryption flag is False or no - # sync_db was defined, otherwise we defer it writing it to the - # received docs table. doc = SoledadDocument(doc_id, rev, content) if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) @@ -212,18 +164,6 @@ class HTTPDocFetcher(object): return new_generation, new_transaction_id, number_of_changes, \ entries - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - def _emit_receive_status(user_data, received_docs, total): content = {'received': received_docs, 'total': total} diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface): "Property, True if the syncer is syncing.") token = Attribute("The authentication Token.") - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface): :param url: the url of the target replica to sync with :type url: str - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool - :return: A deferred that will fire with the local generation before the synchronisation was performed. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index b198607d..ba341bbf 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -164,7 +164,7 @@ class SQLCipherOptions(object): :param cipher_page_size: The page size. :type cipher_page_size: int :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it + Whether to defer encryption of documents, or do it inline while syncing. :type defer_encryption: bool """ @@ -480,7 +480,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): raise DatabaseAccessError(str(e)) @defer.inlineCallbacks - def sync(self, url, creds=None, defer_decryption=True): + def sync(self, url, creds=None): """ Synchronize documents with remote replica exposed at url. @@ -495,10 +495,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param creds: optional dictionary giving credentials to authorize the operation with the server. :type creds: dict - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool :return: A Deferred, that will fire with the local generation (type `int`) @@ -510,8 +506,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.sync_phase = syncer.sync_phase self.syncer = syncer self.sync_exchange_phase = syncer.sync_exchange_phase - local_gen_before_sync = yield syncer.sync( - defer_decryption=defer_decryption) + local_gen_before_sync = yield syncer.sync() self.received_docs = syncer.received_docs defer.returnValue(local_gen_before_sync) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 4cbd9f2a..d3cfe029 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer): self.sync_exchange_phase = None @defer.inlineCallbacks - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents between source and target. - Differently from u1db `Synchronizer.sync` method, this one allows to - pass a `defer_decryption` flag that will postpone the last - step in the synchronization dance, namely, the setting of the last - known generation and transaction id for a given remote replica. - - This is done to allow the ongoing parallel decryption of the incoming - docs to proceed without `InvalidGeneration` conflicts. - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which will fire after the sync has finished with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -172,8 +159,7 @@ class SoledadSynchronizer(Synchronizer): new_gen, new_trans_id = yield sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) + self._insert_doc_from_target, ensure_callback=ensure_callback) logger.debug("target gen after sync: %d" % new_gen) logger.debug("target trans_id after sync: %s" % new_trans_id) if hasattr(self.source, 'commit'): diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py index 77091a41..8e820b9c 100644 --- a/testing/tests/perf/test_encdecpool.py +++ b/testing/tests/perf/test_encdecpool.py @@ -3,7 +3,6 @@ import json from uuid import uuid4 from twisted.internet.defer import gatherResults from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.common.document import SoledadDocument # FIXME: test load is low due issue #7370, higher values will get out of memory @@ -36,43 +35,3 @@ def create_encrypt(amount, size): test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000) test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000) test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000) - - -def create_decrypt(amount, size): - @pytest.mark.benchmark(group="test_pool_decrypt") - @pytest.inlineCallbacks - def test(soledad_client, txbenchmark_with_setup, request, payload): - DOC_CONTENT = {'payload': payload(size)} - client = soledad_client() - - def setup(): - pool = SyncDecrypterPool( - client._crypto, - client._sync_db, - source_replica_uid=client._dbpool.replica_uid, - insert_doc_cb=lambda x, y, z: False) # ignored - pool.start(amount) - request.addfinalizer(pool.stop) - crypto = client._crypto - docs = [] - for _ in xrange(amount): - doc = SoledadDocument( - doc_id=uuid4().hex, rev='rev', - json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc.doc_id, encrypted_content)) - return pool, docs - - def put_and_wait(pool, docs): - deferreds = [] # fires on completion - for idx, (doc_id, content) in enumerate(docs, 1): - deferreds.append(pool.insert_encrypted_received_doc( - doc_id, 'rev', content, idx, "trans_id", idx)) - return gatherResults(deferreds) - - yield txbenchmark_with_setup(setup, put_and_wait) - return test - -test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000) -test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000) -test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000) diff --git a/testing/tests/perf/test_sync.py b/testing/tests/perf/test_sync.py index 4d42395b..0b48a0b9 100644 --- a/testing/tests/perf/test_sync.py +++ b/testing/tests/perf/test_sync.py @@ -23,8 +23,7 @@ def create_upload(uploads, size): def setup(): return load_up(client, uploads, payload(size)) - yield txbenchmark_with_setup(setup, client.sync, - defer_decryption=False) + yield txbenchmark_with_setup(setup, client.sync) return test diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py index 4a32885e..7055a765 100644 --- a/testing/tests/sync/test_encdecpool.py +++ b/testing/tests/sync/test_encdecpool.py @@ -1,34 +1,11 @@ # -*- coding: utf-8 -*- -# test_encdecpool.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Tests for encryption and decryption pool. -""" import json -from random import shuffle - -from mock import MagicMock from twisted.internet.defer import inlineCallbacks from leap.soledad.client.encdecpool import SyncEncrypterPool -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.common.document import SoledadDocument from test_soledad.util import BaseSoledadTest -from twisted.internet import defer DOC_ID = "mydoc" DOC_REV = "rev" @@ -69,238 +46,3 @@ class TestSyncEncrypterPool(BaseSoledadTest): encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV) self.assertIsNotNone(encrypted) - - -class TestSyncDecrypterPool(BaseSoledadTest): - - def _insert_doc_cb(self, doc, gen, trans_id): - """ - Method used to mock the sync's return_doc_cb callback. - """ - self._inserted_docs.append((doc, gen, trans_id)) - - def _setup_pool(self, sync_db=None): - sync_db = sync_db or self._soledad._sync_db - return SyncDecrypterPool( - self._soledad._crypto, - sync_db, - source_replica_uid=self._soledad._dbpool.replica_uid, - insert_doc_cb=self._insert_doc_cb) - - def setUp(self): - BaseSoledadTest.setUp(self) - # setup the pool - self._pool = self._setup_pool() - # reset the inserted docs mock - self._inserted_docs = [] - - def tearDown(self): - if self._pool.running: - self._pool.stop() - BaseSoledadTest.tearDown(self) - - def test_insert_received_doc(self): - """ - Test that one document added to the pool is inserted using the - callback. - """ - self._pool.start(1) - self._pool.insert_received_doc( - DOC_ID, DOC_REV, "{}", 1, "trans_id", 1) - - def _assert_doc_was_inserted(_): - self.assertEqual( - self._inserted_docs, - [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")]) - - self._pool.deferred.addCallback(_assert_doc_was_inserted) - return self._pool.deferred - - def test_looping_control(self): - """ - Start and stop cleanly. - """ - self._pool.start(10) - self.assertTrue(self._pool.running) - self._pool.stop() - self.assertFalse(self._pool.running) - self.assertTrue(self._pool.deferred.called) - - def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self): - """ - Test that docs_received table is migrated, and has the sync_id column - """ - mock_run_query = MagicMock(return_value=defer.succeed(None)) - mock_sync_db = MagicMock() - mock_sync_db.runQuery = mock_run_query - pool = self._setup_pool(mock_sync_db) - d = pool.start(10) - pool.stop() - - def assert_trial_to_create_sync_id_column(_): - mock_run_query.assert_called_once_with( - "ALTER TABLE docs_received ADD COLUMN sync_id") - - d.addCallback(assert_trial_to_create_sync_id_column) - return d - - def test_insert_received_doc_many(self): - """ - Test that many documents added to the pool are inserted using the - callback. - """ - many = 100 - self._pool.start(many) - - # insert many docs in the pool - for i in xrange(many): - gen = idx = i + 1 - doc_id = "doc_id: %d" % idx - rev = "rev: %d" % idx - content = {'idx': idx} - trans_id = "trans_id: %d" % idx - self._pool.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _assert_doc_was_inserted(_): - self.assertEqual(many, len(self._inserted_docs)) - idx = 1 - for doc, gen, trans_id in self._inserted_docs: - expected_gen = idx - expected_doc_id = "doc_id: %d" % idx - expected_rev = "rev: %d" % idx - expected_content = json.dumps({'idx': idx}) - expected_trans_id = "trans_id: %d" % idx - - self.assertEqual(expected_doc_id, doc.doc_id) - self.assertEqual(expected_rev, doc.rev) - self.assertEqual(expected_content, json.dumps(doc.content)) - self.assertEqual(expected_gen, gen) - self.assertEqual(expected_trans_id, trans_id) - - idx += 1 - - self._pool.deferred.addCallback(_assert_doc_was_inserted) - return self._pool.deferred - - def test_insert_encrypted_received_doc(self): - """ - Test that one encrypted document added to the pool is decrypted and - inserted using the callback. - """ - crypto = self._soledad._crypto - doc = SoledadDocument( - doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - - # insert the encrypted document in the pool - self._pool.start(1) - self._pool.insert_encrypted_received_doc( - DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1) - - def _assert_doc_was_decrypted_and_inserted(_): - self.assertEqual(1, len(self._inserted_docs)) - self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")]) - - self._pool.deferred.addCallback( - _assert_doc_was_decrypted_and_inserted) - return self._pool.deferred - - @inlineCallbacks - def test_processing_order(self): - """ - This test ensures that processing of documents only occur if there is - a sequence in place. - """ - crypto = self._soledad._crypto - - docs = [] - for i in xrange(1, 10): - i = str(i) - doc = SoledadDocument( - doc_id=DOC_ID + i, rev=DOC_REV + i, - json=json.dumps(DOC_CONTENT)) - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc, encrypted_content)) - - # insert the encrypted document in the pool - yield self._pool.start(10) # pool is expecting to process 10 docs - self._pool._loop.stop() # we are processing manually - # first three arrives, forming a sequence - for i, (doc, encrypted_content) in enumerate(docs[:3]): - gen = idx = i + 1 - yield self._pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx) - - # last one arrives alone, so it can't be processed - doc, encrypted_content = docs[-1] - yield self._pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10) - - yield self._pool._decrypt_and_recurse() - - self.assertEqual(3, self._pool._processed_docs) - - def test_insert_encrypted_received_doc_many(self, many=100): - """ - Test that many encrypted documents added to the pool are decrypted and - inserted using the callback. - """ - crypto = self._soledad._crypto - self._pool.start(many) - docs = [] - - # insert many encrypted docs in the pool - for i in xrange(many): - gen = idx = i + 1 - doc_id = "doc_id: %d" % idx - rev = "rev: %d" % idx - content = {'idx': idx} - trans_id = "trans_id: %d" % idx - - doc = SoledadDocument( - doc_id=doc_id, rev=rev, json=json.dumps(content)) - - encrypted_content = json.loads(crypto.encrypt_doc(doc)) - docs.append((doc_id, rev, encrypted_content, gen, - trans_id, idx)) - shuffle(docs) - - for doc in docs: - self._pool.insert_encrypted_received_doc(*doc) - - def _assert_docs_were_decrypted_and_inserted(_): - self.assertEqual(many, len(self._inserted_docs)) - idx = 1 - for doc, gen, trans_id in self._inserted_docs: - expected_gen = idx - expected_doc_id = "doc_id: %d" % idx - expected_rev = "rev: %d" % idx - expected_content = json.dumps({'idx': idx}) - expected_trans_id = "trans_id: %d" % idx - - self.assertEqual(expected_doc_id, doc.doc_id) - self.assertEqual(expected_rev, doc.rev) - self.assertEqual(expected_content, json.dumps(doc.content)) - self.assertEqual(expected_gen, gen) - self.assertEqual(expected_trans_id, trans_id) - - idx += 1 - - self._pool.deferred.addCallback( - _assert_docs_were_decrypted_and_inserted) - return self._pool.deferred - - @inlineCallbacks - def test_pool_reuse(self): - """ - The pool is reused between syncs, this test verifies that - reusing is fine. - """ - for i in xrange(3): - yield self.test_insert_encrypted_received_doc_many(5) - self._inserted_docs = [] - decrypted_docs = yield self._pool._get_docs(encrypted=False) - # check that decrypted docs staging is clean - self.assertEquals([], decrypted_docs) - self._pool.stop() diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py index 5290003e..a7d0a92b 100644 --- a/testing/tests/sync/test_sync.py +++ b/testing/tests/sync/test_sync.py @@ -187,7 +187,7 @@ class TestSoledadDbSync( self.addCleanup(target.close) return sync.SoledadSynchronizer( self.db, - target).sync(defer_decryption=False) + target).sync() @defer.inlineCallbacks def test_db_sync(self): diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py index 4948aaf8..482b150c 100644 --- a/testing/tests/sync/test_sync_deferred.py +++ b/testing/tests/sync/test_sync_deferred.py @@ -14,7 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ -Test Leap backend bits: sync with deferred encryption/decryption. +Test Leap backend bits: sync with deferred encryption. """ import time import os @@ -22,12 +22,8 @@ import random import string import shutil -from urlparse import urljoin - from twisted.internet import defer -from leap.soledad.common import couch - from leap.soledad.client import sync from leap.soledad.client.sqlcipher import SQLCipherOptions from leap.soledad.client.sqlcipher import SQLCipherDatabase @@ -41,9 +37,6 @@ from test_soledad.util import make_soledad_app from test_soledad.util import soledad_sync_target -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = True - WAIT_STEP = 1 MAX_WAIT = 10 DBPASS = "pass" @@ -52,7 +45,7 @@ DBPASS = "pass" class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin): """ - Another base class for testing the deferred encryption/decryption during + Another base class for testing the deferred encryption during the syncs, using the intermediate database. """ defer_sync_encryption = True @@ -109,7 +102,7 @@ class TestSoledadDbSyncDeferredEncDecr( """ Test db.sync remote sync shortcut. - Case with deferred encryption and decryption: using the intermediate + Case with deferred encryption: using the intermediate syncdb. """ @@ -158,7 +151,7 @@ class TestSoledadDbSyncDeferredEncDecr( self.addCleanup(target.close) return sync.SoledadSynchronizer( dbsyncer, - target).sync(defer_decryption=True) + target).sync() def wait_for_sync(self): """ diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py index 2626ab2a..2bcb3aec 100644 --- a/testing/tests/sync/test_sync_mutex.py +++ b/testing/tests/sync/test_sync_mutex.py @@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target _old_sync = SoledadSynchronizer.sync -def _timed_sync(self, defer_decryption=True): +def _timed_sync(self): t = time.time() sync_id = uuid.uuid4() @@ -62,7 +62,7 @@ def _timed_sync(self, defer_decryption=True): self.source.sync_times[sync_id]['end'] = t return passthrough - d = _old_sync(self, defer_decryption=defer_decryption) + d = _old_sync(self) d.addBoth(_store_finish_time) return d diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index 964468ce..a2935539 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -231,8 +231,7 @@ class TestSoledadSyncTarget( doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}') new_gen, trans_id = yield remote_target.sync_exchange( [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=receive_doc) self.assertEqual(1, new_gen) self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', False) @@ -285,8 +284,7 @@ class TestSoledadSyncTarget( 'replica', last_known_generation=0, last_known_trans_id=None, - insert_doc_cb=receive_doc, - defer_decryption=False) + insert_doc_cb=receive_doc) self.assertGetEncryptedDoc( db, 'doc-here', 'replica:1', '{"value": "here"}', @@ -298,8 +296,7 @@ class TestSoledadSyncTarget( trigger_ids = [] new_gen, trans_id = yield remote_target.sync_exchange( [(doc2, 11, 'T-sud')], 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=receive_doc) self.assertGetEncryptedDoc( db, 'doc-here2', 'replica:1', '{"value": "here2"}', False) @@ -331,7 +328,7 @@ class TestSoledadSyncTarget( new_gen, trans_id = yield remote_target.sync_exchange( [(doc, 10, 'T-sid')], 'replica', last_known_generation=0, last_known_trans_id=None, insert_doc_cb=receive_doc, - ensure_callback=ensure_cb, defer_decryption=False) + ensure_callback=ensure_cb) self.assertEqual(1, new_gen) db = self.db2 self.assertEqual(1, len(replica_uid_box)) @@ -446,8 +443,7 @@ class SoledadDatabaseSyncTargetTests( 'T-sid')] new_gen, trans_id = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertGetEncryptedDoc( self.db, 'doc-id', 'replica:1', tests.simple_doc, False) self.assertTransactionLog(['doc-id'], self.db) @@ -471,8 +467,7 @@ class SoledadDatabaseSyncTargetTests( 'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')] new_gen, trans_id = yield self.st.sync_exchange( docs_by_gen, 'replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertGetEncryptedDoc( self.db, 'doc-id', 'replica:1', tests.simple_doc, False) self.assertGetEncryptedDoc( @@ -498,8 +493,7 @@ class SoledadDatabaseSyncTargetTests( self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) new_gen, _ = yield self.st.sync_exchange( [], 'other-replica', last_known_generation=0, - last_known_trans_id=None, insert_doc_cb=self.receive_doc, - defer_decryption=False) + last_known_trans_id=None, insert_doc_cb=self.receive_doc) self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db) self.assertEqual(2, new_gen) self.assertEqual( @@ -779,10 +773,6 @@ class SoledadDatabaseSyncTargetTests( yield self.st.record_sync_info('replica', 0, 'T-sid') self.assertEqual(expected, called) - -# Just to make clear how this test is different... :) -DEFER_DECRYPTION = False - WAIT_STEP = 1 MAX_WAIT = 10 DBPASS = "pass" @@ -890,8 +880,7 @@ class TestSoledadDbSync( defer_encryption=True) self.dbsyncer = dbsyncer return dbsyncer.sync(target_url, - creds=creds, - defer_decryption=DEFER_DECRYPTION) + creds=creds) else: return self._do_sync(self, target_name) |