diff options
Diffstat (limited to 'client/src/leap')
-rw-r--r-- | client/src/leap/soledad/client/api.py | 31 | ||||
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 380 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 12 | ||||
-rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 66 | ||||
-rw-r--r-- | client/src/leap/soledad/client/interfaces.py | 7 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 11 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sync.py | 18 |
7 files changed, 20 insertions, 505 deletions
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index 6870d5ba..cbcae4f7 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -169,7 +169,7 @@ class Soledad(object): :type auth_token: str :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it + Whether to defer encryption of documents, or do it inline while syncing. :type defer_encryption: bool @@ -299,9 +299,9 @@ class Soledad(object): ) self._sqlcipher_opts = opts - # the sync_db is used both for deferred encryption and decryption, so + # the sync_db is used both for deferred encryption, so # we want to initialize it anyway to allow for all combinations of - # deferred encryption and decryption configurations. + # deferred encryption configurations. self._initialize_sync_db(opts) self._dbpool = adbapi.getConnectionPool( opts, sync_enc_pool=self._sync_enc_pool) @@ -700,37 +700,26 @@ class Soledad(object): if syncable and not self._dbsyncer: self._init_u1db_syncer() - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents with the server replica. This method uses a lock to prevent multiple concurrent sync processes over the same local db file. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred lock that will run the actual sync process when the lock is acquired, and which will fire with with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred """ d = self.sync_lock.run( - self._sync, - defer_decryption) + self._sync) return d - def _sync(self, defer_decryption): + def _sync(self): """ Synchronize documents with the server replica. - :param defer_decryption: - Whether to defer decryption of documents, or do it inline while - syncing. - :type defer_decryption: bool - :return: A deferred whose callback will be invoked with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -740,8 +729,7 @@ class Soledad(object): return d = self._dbsyncer.sync( sync_url, - creds=self._creds, - defer_decryption=defer_decryption) + creds=self._creds) def _sync_callback(local_gen): self._last_received_docs = docs = self._dbsyncer.received_docs @@ -874,12 +862,9 @@ class Soledad(object): """ maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)" encr = encdecpool.SyncEncrypterPool - decr = encdecpool.SyncDecrypterPool sql_encr_table_query = (maybe_create % ( encr.TABLE_NAME, encr.FIELD_NAMES)) - sql_decr_table_query = (maybe_create % ( - decr.TABLE_NAME, decr.FIELD_NAMES)) - return (sql_encr_table_query, sql_decr_table_query) + return (sql_encr_table_query,) # # ISecretsStorage diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 056b012f..8eaefa77 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -22,14 +22,9 @@ during synchronization. """ -import json -from uuid import uuid4 - -from twisted.internet.task import LoopingCall from twisted.internet import threads from twisted.internet import defer -from leap.soledad.common.document import SoledadDocument from leap.soledad.common import soledad_assert from leap.soledad.common.log import getLogger @@ -41,7 +36,7 @@ logger = getLogger(__name__) # -# Encrypt/decrypt pools of workers +# Encrypt pool of workers # class SyncEncryptDecryptPool(object): @@ -282,376 +277,3 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, """ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret) return doc_id, doc_rev, decrypted_content, gen, trans_id, idx - - -class SyncDecrypterPool(SyncEncryptDecryptPool): - """ - Pool of workers that spawn subprocesses to execute the symmetric decryption - of documents that were received. - - The decryption of the received documents is done in two steps: - - 1. Encrypted documents are stored in the sync db by the actual soledad - sync loop. - 2. The soledad sync loop tells us how many documents we should expect - to process. - 3. We start a decrypt-and-process loop: - - a. Encrypted documents are fetched. - b. Encrypted documents are decrypted. - c. The longest possible list of decrypted documents are inserted - in the soledad db (this depends on which documents have already - arrived and which documents have already been decrypte, because - the order of insertion in the local soledad db matters). - d. Processed documents are deleted from the database. - - 4. When we have processed as many documents as we should, the loop - finishes. - """ - TABLE_NAME = "docs_received" - FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \ - "trans_id, encrypted, idx, sync_id" - - """ - Period of recurrence of the periodic decrypting task, in seconds. - """ - DECRYPT_LOOP_PERIOD = 0.5 - - def __init__(self, *args, **kwargs): - """ - Initialize the decrypter pool, and setup a dict for putting the - results of the decrypted docs until they are picked by the insert - routine that gets them in order. - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - :param source_replica_uid: The source replica uid, used to find the - correct callback for inserting documents. - :type source_replica_uid: str - """ - self._insert_doc_cb = kwargs.pop("insert_doc_cb") - self.source_replica_uid = kwargs.pop("source_replica_uid") - - SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - - self._docs_to_process = None - self._processed_docs = 0 - self._last_inserted_idx = 0 - - self._loop = LoopingCall(self._decrypt_and_recurse) - - def _start_pool(self, period): - self._loop.start(period) - - def start(self, docs_to_process): - """ - Set the number of documents we expect to process. - - This should be called by the during the sync exchange process as soon - as we know how many documents are arriving from the server. - - :param docs_to_process: The number of documents to process. - :type docs_to_process: int - """ - SyncEncryptDecryptPool.start(self) - self._decrypted_docs_indexes = set() - self._sync_id = uuid4().hex - self._docs_to_process = docs_to_process - self._deferred = defer.Deferred() - d = self._init_db() - d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD)) - return d - - def stop(self): - if self._loop.running: - self._loop.stop() - self._finish() - SyncEncryptDecryptPool.stop(self) - - def _init_db(self): - """ - Ensure sync_id column is present then - Empty the received docs table of the sync database. - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" % - self.TABLE_NAME) - d = self._runQuery(ensure_sync_id_column) - - def empty_received_docs(_): - query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,) - return self._runOperation(query, (self._sync_id,)) - - d.addCallbacks(empty_received_docs, empty_received_docs) - return d - - def _errback(self, failure): - logger.error(failure) - self._deferred.errback(failure) - self._processed_docs = 0 - self._last_inserted_idx = 0 - - @property - def deferred(self): - """ - Deferred that will be fired when the decryption loop has finished - processing all the documents. - """ - return self._deferred - - def insert_encrypted_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Decrypt and insert a received document into local staging area to be - processed later on. - - :param doc_id: The document ID. - :type doc_id: str - :param doc_rev: The document Revision - :param doc_rev: str - :param content: The content of the document - :type content: dict - :param gen: The document Generation - :type gen: int - :param trans_id: Transaction ID - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire after the decrypted document has - been inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - soledad_assert(self._crypto is not None, "need a crypto object") - - key = self._crypto.doc_passphrase(doc_id) - secret = self._crypto.secret - args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx - # decrypt asynchronously - # TODO use dedicated threadpool / move to ampoule - d = threads.deferToThread( - decrypt_doc_task, *args) - # callback will insert it for later processing - d.addCallback(self._decrypt_doc_cb) - return d - - def insert_received_doc( - self, doc_id, doc_rev, content, gen, trans_id, idx): - """ - Insert a document that is not symmetrically encrypted. - We store it in the staging area (the decrypted_docs dictionary) to be - picked up in order as the preceding documents are decrypted. - - :param doc_id: The document id - :type doc_id: str - :param doc_rev: The document revision - :param doc_rev: str or dict - :param content: The content of the document - :type content: dict - :param gen: The document generation - :type gen: int - :param trans_id: The transaction id - :type trans_id: str - :param idx: The index of this document in the current sync process. - :type idx: int - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - if not isinstance(content, str): - content = json.dumps(content) - query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \ - % self.TABLE_NAME - d = self._runOperation( - query, (doc_id, doc_rev, content, gen, trans_id, 0, - idx, self._sync_id)) - d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx)) - return d - - def _delete_received_docs(self, doc_ids): - """ - Delete a list of received docs after get them inserted into the db. - - :param doc_id: Document ID list. - :type doc_id: list - - :return: A deferred that will fire when the operation in the database - has finished. - :rtype: twisted.internet.defer.Deferred - """ - placeholders = ', '.join('?' for _ in doc_ids) - query = "DELETE FROM '%s' WHERE doc_id in (%s)" \ - % (self.TABLE_NAME, placeholders) - return self._runOperation(query, (doc_ids)) - - def _decrypt_doc_cb(self, result): - """ - Store the decryption result in the sync db from where it will later be - picked by _process_decrypted_docs. - - :param result: A tuple containing the document's id, revision, - content, generation, transaction id and sync index. - :type result: tuple(str, str, str, int, str, int) - - :return: A deferred that will fire after the document has been - inserted in the sync db. - :rtype: twisted.internet.defer.Deferred - """ - doc_id, rev, content, gen, trans_id, idx = result - logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s" - % (doc_id, rev, gen, trans_id)) - return self.insert_received_doc( - doc_id, rev, content, gen, trans_id, idx) - - def _get_docs(self, encrypted=None, sequence=None): - """ - Get documents from the received docs table in the sync db. - - :param encrypted: If not None, only return documents with encrypted - field equal to given parameter. - :type encrypted: bool or None - :param order_by: The name of the field to order results. - - :return: A deferred that will fire with the results of the database - query. - :rtype: twisted.internet.defer.Deferred - """ - query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \ - "idx FROM %s" % self.TABLE_NAME - parameters = [] - if encrypted or sequence: - query += " WHERE sync_id = ? and" - parameters += [self._sync_id] - if encrypted: - query += " encrypted = ?" - parameters += [int(encrypted)] - if sequence: - query += " idx in (" + ', '.join('?' * len(sequence)) + ")" - parameters += [int(i) for i in sequence] - query += " ORDER BY idx ASC" - return self._runQuery(query, parameters) - - @defer.inlineCallbacks - def _get_insertable_docs(self): - """ - Return a list of non-encrypted documents ready to be inserted. - - :return: A deferred that will fire with the list of insertable - documents. - :rtype: twisted.internet.defer.Deferred - """ - # Here, check in memory what are the insertable indexes that can - # form a sequence starting from the last inserted index - sequence = [] - insertable_docs = [] - next_index = self._last_inserted_idx + 1 - while next_index in self._decrypted_docs_indexes: - sequence.append(str(next_index)) - next_index += 1 - if len(sequence) > 900: - # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER - # if we try to query more, SQLite will refuse - # we need to find a way to improve this - # being researched in #7669 - break - # Then fetch all the ones ready for insertion. - if sequence: - insertable_docs = yield self._get_docs(encrypted=False, - sequence=sequence) - defer.returnValue(insertable_docs) - - @defer.inlineCallbacks - def _process_decrypted_docs(self): - """ - Fetch as many decrypted documents as can be taken from the expected - order and insert them in the local replica. - - :return: A deferred that will fire with the list of inserted - documents. - :rtype: twisted.internet.defer.Deferred - """ - insertable = yield self._get_insertable_docs() - processed_docs_ids = [] - for doc_fields in insertable: - method = self._insert_decrypted_local_doc - # FIXME: This is used only because SQLCipherU1DBSync is synchronous - # When adbapi is used there is no need for an external thread - # Without this the reactor can freeze and fail docs download - yield threads.deferToThread(method, *doc_fields) - processed_docs_ids.append(doc_fields[0]) - yield self._delete_received_docs(processed_docs_ids) - - def _insert_decrypted_local_doc(self, doc_id, doc_rev, content, - gen, trans_id, encrypted, idx): - """ - Insert the decrypted document into the local replica. - - Make use of the passed callback `insert_doc_cb` passed to the caller - by u1db sync. - - :param doc_id: The document id. - :type doc_id: str - :param doc_rev: The document revision. - :type doc_rev: str - :param content: The serialized content of the document. - :type content: str - :param gen: The generation corresponding to the modification of that - document. - :type gen: int - :param trans_id: The transaction id corresponding to the modification - of that document. - :type trans_id: str - """ - # could pass source_replica in params for callback chain - logger.debug("sync decrypter pool: inserting doc in local db: " - "%s:%s %s" % (doc_id, doc_rev, gen)) - - # convert deleted documents to avoid error on document creation - if content == 'null': - content = None - doc = SoledadDocument(doc_id, doc_rev, content) - gen = int(gen) - self._insert_doc_cb(doc, gen, trans_id) - - # store info about processed docs - self._last_inserted_idx = idx - self._processed_docs += 1 - - @defer.inlineCallbacks - def _decrypt_and_recurse(self): - """ - Decrypt the documents received from remote replica and insert them - into the local one. - - This method implicitelly returns a defferred (see the decorator - above). It should only be called by _launch_decrypt_and_process(). - because this way any exceptions raised here will be stored by the - errback attached to the deferred returned. - - :return: A deferred which will fire after all decrypt, process and - delete operations have been executed. - :rtype: twisted.internet.defer.Deferred - """ - if not self.running: - defer.returnValue(None) - processed = self._processed_docs - pending = self._docs_to_process - - if processed < pending: - yield self._process_decrypted_docs() - else: - self._finish() - - def _finish(self): - self._processed_docs = 0 - self._last_inserted_idx = 0 - self._decrypted_docs_indexes = set() - if not self._deferred.called: - self._deferred.callback(None) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 3c8e3764..c9da939c 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -43,8 +43,6 @@ class SyncTargetAPI(SyncTarget): def close(self): if self._sync_enc_pool: self._sync_enc_pool.stop() - if self._sync_decr_pool: - self._sync_decr_pool.stop() yield self._http.close() @property @@ -153,7 +151,7 @@ class SyncTargetAPI(SyncTarget): def sync_exchange(self, docs_by_generation, source_replica_uid, last_known_generation, last_known_trans_id, insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): + sync_id=None): """ Find out which documents the remote database does not know about, encrypt and send them. After that, receive documents from the remote @@ -185,11 +183,6 @@ class SyncTargetAPI(SyncTarget): created. :type ensure_callback: function - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which fires with the new generation and transaction id of the target replica. :rtype: twisted.internet.defer.Deferred @@ -221,8 +214,7 @@ class SyncTargetAPI(SyncTarget): cur_target_gen, cur_target_trans_id = yield self._receive_docs( last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) + ensure_callback, sync_id) # update gen and trans id info in case we just sent and did not # receive docs. diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 26606e9b..1f1bc480 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -21,7 +21,6 @@ from twisted.internet import defer from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS from leap.soledad.client.events import emit_async from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger from leap.soledad.common.document import SoledadDocument @@ -50,26 +49,11 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - defer_decryption = False - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None + ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - # we fetch the first document before fetching the rest because we need # to know the total number of documents to be received, and this # information comes as metadata to each request. @@ -85,14 +69,6 @@ class HTTPDocFetcher(object): new_generation = ngen new_transaction_id = ntrans - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - defer.returnValue([new_generation, new_transaction_id]) def _fetch_all(self, last_known_generation, @@ -126,9 +102,6 @@ class HTTPDocFetcher(object): new_generation, new_transaction_id, number_of_changes, entries =\ self._parse_received_doc_response(response) - if self._sync_decr_pool and not self._sync_decr_pool.running: - self._sync_decr_pool.start(number_of_changes) - for doc_id, rev, content, gen, trans_id in entries: if doc_id is not None: # decrypt incoming document and insert into local database @@ -136,31 +109,10 @@ class HTTPDocFetcher(object): # symmetric decryption of document's contents # --------------------------------------------------------- # If arriving content was symmetrically encrypted, we decrypt - # it. We do it inline if defer_decryption flag is False or no - # sync_db was defined, otherwise we defer it writing it to the - # received docs table. doc = SoledadDocument(doc_id, rev, content) if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) self._received_docs += 1 user_data = {'uuid': self.uuid, 'userid': self.userid} _emit_receive_status(user_data, self._received_docs, total) @@ -212,18 +164,6 @@ class HTTPDocFetcher(object): return new_generation, new_transaction_id, number_of_changes, \ entries - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - def _emit_receive_status(user_data, received_docs, total): content = {'received': received_docs, 'total': total} diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py index 14b34d24..82927ff4 100644 --- a/client/src/leap/soledad/client/interfaces.py +++ b/client/src/leap/soledad/client/interfaces.py @@ -321,7 +321,7 @@ class ISyncableStorage(Interface): "Property, True if the syncer is syncing.") token = Attribute("The authentication Token.") - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize the local encrypted replica with a remote replica. @@ -331,11 +331,6 @@ class ISyncableStorage(Interface): :param url: the url of the target replica to sync with :type url: str - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool - :return: A deferred that will fire with the local generation before the synchronisation was performed. diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index b198607d..ba341bbf 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -164,7 +164,7 @@ class SQLCipherOptions(object): :param cipher_page_size: The page size. :type cipher_page_size: int :param defer_encryption: - Whether to defer encryption/decryption of documents, or do it + Whether to defer encryption of documents, or do it inline while syncing. :type defer_encryption: bool """ @@ -480,7 +480,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): raise DatabaseAccessError(str(e)) @defer.inlineCallbacks - def sync(self, url, creds=None, defer_decryption=True): + def sync(self, url, creds=None): """ Synchronize documents with remote replica exposed at url. @@ -495,10 +495,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase): :param creds: optional dictionary giving credentials to authorize the operation with the server. :type creds: dict - :param defer_decryption: - Whether to defer the decryption process using the intermediate - database. If False, decryption will be done inline. - :type defer_decryption: bool :return: A Deferred, that will fire with the local generation (type `int`) @@ -510,8 +506,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): self.sync_phase = syncer.sync_phase self.syncer = syncer self.sync_exchange_phase = syncer.sync_exchange_phase - local_gen_before_sync = yield syncer.sync( - defer_decryption=defer_decryption) + local_gen_before_sync = yield syncer.sync() self.received_docs = syncer.received_docs defer.returnValue(local_gen_before_sync) diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py index 4cbd9f2a..d3cfe029 100644 --- a/client/src/leap/soledad/client/sync.py +++ b/client/src/leap/soledad/client/sync.py @@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer): self.sync_exchange_phase = None @defer.inlineCallbacks - def sync(self, defer_decryption=True): + def sync(self): """ Synchronize documents between source and target. - Differently from u1db `Synchronizer.sync` method, this one allows to - pass a `defer_decryption` flag that will postpone the last - step in the synchronization dance, namely, the setting of the last - known generation and transaction id for a given remote replica. - - This is done to allow the ongoing parallel decryption of the incoming - docs to proceed without `InvalidGeneration` conflicts. - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - :return: A deferred which will fire after the sync has finished with the local generation before the synchronization was performed. :rtype: twisted.internet.defer.Deferred @@ -172,8 +159,7 @@ class SoledadSynchronizer(Synchronizer): new_gen, new_trans_id = yield sync_target.sync_exchange( docs_by_generation, self.source._replica_uid, target_last_known_gen, target_last_known_trans_id, - self._insert_doc_from_target, ensure_callback=ensure_callback, - defer_decryption=defer_decryption) + self._insert_doc_from_target, ensure_callback=ensure_callback) logger.debug("target gen after sync: %d" % new_gen) logger.debug("target trans_id after sync: %s" % new_trans_id) if hasattr(self.source, 'commit'): |