summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/leap/soledad/client/api.py31
-rw-r--r--client/src/leap/soledad/client/encdecpool.py380
-rw-r--r--client/src/leap/soledad/client/http_target/api.py12
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py66
-rw-r--r--client/src/leap/soledad/client/interfaces.py7
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py11
-rw-r--r--client/src/leap/soledad/client/sync.py18
-rw-r--r--testing/tests/perf/test_encdecpool.py41
-rw-r--r--testing/tests/perf/test_sync.py3
-rw-r--r--testing/tests/sync/test_encdecpool.py258
-rw-r--r--testing/tests/sync/test_sync.py2
-rw-r--r--testing/tests/sync/test_sync_deferred.py15
-rw-r--r--testing/tests/sync/test_sync_mutex.py4
-rw-r--r--testing/tests/sync/test_sync_target.py27
14 files changed, 36 insertions, 839 deletions
diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py
index 6870d5ba..cbcae4f7 100644
--- a/client/src/leap/soledad/client/api.py
+++ b/client/src/leap/soledad/client/api.py
@@ -169,7 +169,7 @@ class Soledad(object):
:type auth_token: str
:param defer_encryption:
- Whether to defer encryption/decryption of documents, or do it
+ Whether to defer encryption of documents, or do it
inline while syncing.
:type defer_encryption: bool
@@ -299,9 +299,9 @@ class Soledad(object):
)
self._sqlcipher_opts = opts
- # the sync_db is used both for deferred encryption and decryption, so
+ # the sync_db is used both for deferred encryption, so
# we want to initialize it anyway to allow for all combinations of
- # deferred encryption and decryption configurations.
+ # deferred encryption configurations.
self._initialize_sync_db(opts)
self._dbpool = adbapi.getConnectionPool(
opts, sync_enc_pool=self._sync_enc_pool)
@@ -700,37 +700,26 @@ class Soledad(object):
if syncable and not self._dbsyncer:
self._init_u1db_syncer()
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize documents with the server replica.
This method uses a lock to prevent multiple concurrent sync processes
over the same local db file.
- :param defer_decryption:
- Whether to defer decryption of documents, or do it inline while
- syncing.
- :type defer_decryption: bool
-
:return: A deferred lock that will run the actual sync process when
the lock is acquired, and which will fire with with the local
generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
"""
d = self.sync_lock.run(
- self._sync,
- defer_decryption)
+ self._sync)
return d
- def _sync(self, defer_decryption):
+ def _sync(self):
"""
Synchronize documents with the server replica.
- :param defer_decryption:
- Whether to defer decryption of documents, or do it inline while
- syncing.
- :type defer_decryption: bool
-
:return: A deferred whose callback will be invoked with the local
generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
@@ -740,8 +729,7 @@ class Soledad(object):
return
d = self._dbsyncer.sync(
sync_url,
- creds=self._creds,
- defer_decryption=defer_decryption)
+ creds=self._creds)
def _sync_callback(local_gen):
self._last_received_docs = docs = self._dbsyncer.received_docs
@@ -874,12 +862,9 @@ class Soledad(object):
"""
maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
encr = encdecpool.SyncEncrypterPool
- decr = encdecpool.SyncDecrypterPool
sql_encr_table_query = (maybe_create % (
encr.TABLE_NAME, encr.FIELD_NAMES))
- sql_decr_table_query = (maybe_create % (
- decr.TABLE_NAME, decr.FIELD_NAMES))
- return (sql_encr_table_query, sql_decr_table_query)
+ return (sql_encr_table_query,)
#
# ISecretsStorage
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 056b012f..8eaefa77 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -22,14 +22,9 @@ during synchronization.
"""
-import json
-from uuid import uuid4
-
-from twisted.internet.task import LoopingCall
from twisted.internet import threads
from twisted.internet import defer
-from leap.soledad.common.document import SoledadDocument
from leap.soledad.common import soledad_assert
from leap.soledad.common.log import getLogger
@@ -41,7 +36,7 @@ logger = getLogger(__name__)
#
-# Encrypt/decrypt pools of workers
+# Encrypt pool of workers
#
class SyncEncryptDecryptPool(object):
@@ -282,376 +277,3 @@ def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
"""
decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. Encrypted documents are stored in the sync db by the actual soledad
- sync loop.
- 2. The soledad sync loop tells us how many documents we should expect
- to process.
- 3. We start a decrypt-and-process loop:
-
- a. Encrypted documents are fetched.
- b. Encrypted documents are decrypted.
- c. The longest possible list of decrypted documents are inserted
- in the soledad db (this depends on which documents have already
- arrived and which documents have already been decrypte, because
- the order of insertion in the local soledad db matters).
- d. Processed documents are deleted from the database.
-
- 4. When we have processed as many documents as we should, the loop
- finishes.
- """
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
- "trans_id, encrypted, idx, sync_id"
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
- :param source_replica_uid: The source replica uid, used to find the
- correct callback for inserting documents.
- :type source_replica_uid: str
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- self.source_replica_uid = kwargs.pop("source_replica_uid")
-
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
-
- self._docs_to_process = None
- self._processed_docs = 0
- self._last_inserted_idx = 0
-
- self._loop = LoopingCall(self._decrypt_and_recurse)
-
- def _start_pool(self, period):
- self._loop.start(period)
-
- def start(self, docs_to_process):
- """
- Set the number of documents we expect to process.
-
- This should be called by the during the sync exchange process as soon
- as we know how many documents are arriving from the server.
-
- :param docs_to_process: The number of documents to process.
- :type docs_to_process: int
- """
- SyncEncryptDecryptPool.start(self)
- self._decrypted_docs_indexes = set()
- self._sync_id = uuid4().hex
- self._docs_to_process = docs_to_process
- self._deferred = defer.Deferred()
- d = self._init_db()
- d.addCallback(lambda _: self._start_pool(self.DECRYPT_LOOP_PERIOD))
- return d
-
- def stop(self):
- if self._loop.running:
- self._loop.stop()
- self._finish()
- SyncEncryptDecryptPool.stop(self)
-
- def _init_db(self):
- """
- Ensure sync_id column is present then
- Empty the received docs table of the sync database.
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- ensure_sync_id_column = ("ALTER TABLE %s ADD COLUMN sync_id" %
- self.TABLE_NAME)
- d = self._runQuery(ensure_sync_id_column)
-
- def empty_received_docs(_):
- query = "DELETE FROM %s WHERE sync_id <> ?" % (self.TABLE_NAME,)
- return self._runOperation(query, (self._sync_id,))
-
- d.addCallbacks(empty_received_docs, empty_received_docs)
- return d
-
- def _errback(self, failure):
- logger.error(failure)
- self._deferred.errback(failure)
- self._processed_docs = 0
- self._last_inserted_idx = 0
-
- @property
- def deferred(self):
- """
- Deferred that will be fired when the decryption loop has finished
- processing all the documents.
- """
- return self._deferred
-
- def insert_encrypted_received_doc(
- self, doc_id, doc_rev, content, gen, trans_id, idx):
- """
- Decrypt and insert a received document into local staging area to be
- processed later on.
-
- :param doc_id: The document ID.
- :type doc_id: str
- :param doc_rev: The document Revision
- :param doc_rev: str
- :param content: The content of the document
- :type content: dict
- :param gen: The document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
-
- :return: A deferred that will fire after the decrypted document has
- been inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, doc_rev, content, gen, trans_id, key, secret, idx
- # decrypt asynchronously
- # TODO use dedicated threadpool / move to ampoule
- d = threads.deferToThread(
- decrypt_doc_task, *args)
- # callback will insert it for later processing
- d.addCallback(self._decrypt_doc_cb)
- return d
-
- def insert_received_doc(
- self, doc_id, doc_rev, content, gen, trans_id, idx):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The document id
- :type doc_id: str
- :param doc_rev: The document revision
- :param doc_rev: str or dict
- :param content: The content of the document
- :type content: dict
- :param gen: The document generation
- :type gen: int
- :param trans_id: The transaction id
- :type trans_id: str
- :param idx: The index of this document in the current sync process.
- :type idx: int
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- if not isinstance(content, str):
- content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?, ?)" \
- % self.TABLE_NAME
- d = self._runOperation(
- query, (doc_id, doc_rev, content, gen, trans_id, 0,
- idx, self._sync_id))
- d.addCallback(lambda _: self._decrypted_docs_indexes.add(idx))
- return d
-
- def _delete_received_docs(self, doc_ids):
- """
- Delete a list of received docs after get them inserted into the db.
-
- :param doc_id: Document ID list.
- :type doc_id: list
-
- :return: A deferred that will fire when the operation in the database
- has finished.
- :rtype: twisted.internet.defer.Deferred
- """
- placeholders = ', '.join('?' for _ in doc_ids)
- query = "DELETE FROM '%s' WHERE doc_id in (%s)" \
- % (self.TABLE_NAME, placeholders)
- return self._runOperation(query, (doc_ids))
-
- def _decrypt_doc_cb(self, result):
- """
- Store the decryption result in the sync db from where it will later be
- picked by _process_decrypted_docs.
-
- :param result: A tuple containing the document's id, revision,
- content, generation, transaction id and sync index.
- :type result: tuple(str, str, str, int, str, int)
-
- :return: A deferred that will fire after the document has been
- inserted in the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- doc_id, rev, content, gen, trans_id, idx = result
- logger.debug("sync decrypter pool: decrypted doc %s: %s %s %s"
- % (doc_id, rev, gen, trans_id))
- return self.insert_received_doc(
- doc_id, rev, content, gen, trans_id, idx)
-
- def _get_docs(self, encrypted=None, sequence=None):
- """
- Get documents from the received docs table in the sync db.
-
- :param encrypted: If not None, only return documents with encrypted
- field equal to given parameter.
- :type encrypted: bool or None
- :param order_by: The name of the field to order results.
-
- :return: A deferred that will fire with the results of the database
- query.
- :rtype: twisted.internet.defer.Deferred
- """
- query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
- "idx FROM %s" % self.TABLE_NAME
- parameters = []
- if encrypted or sequence:
- query += " WHERE sync_id = ? and"
- parameters += [self._sync_id]
- if encrypted:
- query += " encrypted = ?"
- parameters += [int(encrypted)]
- if sequence:
- query += " idx in (" + ', '.join('?' * len(sequence)) + ")"
- parameters += [int(i) for i in sequence]
- query += " ORDER BY idx ASC"
- return self._runQuery(query, parameters)
-
- @defer.inlineCallbacks
- def _get_insertable_docs(self):
- """
- Return a list of non-encrypted documents ready to be inserted.
-
- :return: A deferred that will fire with the list of insertable
- documents.
- :rtype: twisted.internet.defer.Deferred
- """
- # Here, check in memory what are the insertable indexes that can
- # form a sequence starting from the last inserted index
- sequence = []
- insertable_docs = []
- next_index = self._last_inserted_idx + 1
- while next_index in self._decrypted_docs_indexes:
- sequence.append(str(next_index))
- next_index += 1
- if len(sequence) > 900:
- # 999 is the default value of SQLITE_MAX_VARIABLE_NUMBER
- # if we try to query more, SQLite will refuse
- # we need to find a way to improve this
- # being researched in #7669
- break
- # Then fetch all the ones ready for insertion.
- if sequence:
- insertable_docs = yield self._get_docs(encrypted=False,
- sequence=sequence)
- defer.returnValue(insertable_docs)
-
- @defer.inlineCallbacks
- def _process_decrypted_docs(self):
- """
- Fetch as many decrypted documents as can be taken from the expected
- order and insert them in the local replica.
-
- :return: A deferred that will fire with the list of inserted
- documents.
- :rtype: twisted.internet.defer.Deferred
- """
- insertable = yield self._get_insertable_docs()
- processed_docs_ids = []
- for doc_fields in insertable:
- method = self._insert_decrypted_local_doc
- # FIXME: This is used only because SQLCipherU1DBSync is synchronous
- # When adbapi is used there is no need for an external thread
- # Without this the reactor can freeze and fail docs download
- yield threads.deferToThread(method, *doc_fields)
- processed_docs_ids.append(doc_fields[0])
- yield self._delete_received_docs(processed_docs_ids)
-
- def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id, encrypted, idx):
- """
- Insert the decrypted document into the local replica.
-
- Make use of the passed callback `insert_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- logger.debug("sync decrypter pool: inserting doc in local db: "
- "%s:%s %s" % (doc_id, doc_rev, gen))
-
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- self._insert_doc_cb(doc, gen, trans_id)
-
- # store info about processed docs
- self._last_inserted_idx = idx
- self._processed_docs += 1
-
- @defer.inlineCallbacks
- def _decrypt_and_recurse(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- This method implicitelly returns a defferred (see the decorator
- above). It should only be called by _launch_decrypt_and_process().
- because this way any exceptions raised here will be stored by the
- errback attached to the deferred returned.
-
- :return: A deferred which will fire after all decrypt, process and
- delete operations have been executed.
- :rtype: twisted.internet.defer.Deferred
- """
- if not self.running:
- defer.returnValue(None)
- processed = self._processed_docs
- pending = self._docs_to_process
-
- if processed < pending:
- yield self._process_decrypted_docs()
- else:
- self._finish()
-
- def _finish(self):
- self._processed_docs = 0
- self._last_inserted_idx = 0
- self._decrypted_docs_indexes = set()
- if not self._deferred.called:
- self._deferred.callback(None)
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index 3c8e3764..c9da939c 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -43,8 +43,6 @@ class SyncTargetAPI(SyncTarget):
def close(self):
if self._sync_enc_pool:
self._sync_enc_pool.stop()
- if self._sync_decr_pool:
- self._sync_decr_pool.stop()
yield self._http.close()
@property
@@ -153,7 +151,7 @@ class SyncTargetAPI(SyncTarget):
def sync_exchange(self, docs_by_generation, source_replica_uid,
last_known_generation, last_known_trans_id,
insert_doc_cb, ensure_callback=None,
- defer_decryption=True, sync_id=None):
+ sync_id=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them. After that, receive documents from the remote
@@ -185,11 +183,6 @@ class SyncTargetAPI(SyncTarget):
created.
:type ensure_callback: function
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
:return: A deferred which fires with the new generation and
transaction id of the target replica.
:rtype: twisted.internet.defer.Deferred
@@ -221,8 +214,7 @@ class SyncTargetAPI(SyncTarget):
cur_target_gen, cur_target_trans_id = yield self._receive_docs(
last_known_generation, last_known_trans_id,
- ensure_callback, sync_id,
- defer_decryption=defer_decryption)
+ ensure_callback, sync_id)
# update gen and trans id info in case we just sent and did not
# receive docs.
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 26606e9b..1f1bc480 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
from leap.soledad.common.document import SoledadDocument
@@ -50,26 +49,11 @@ class HTTPDocFetcher(object):
@defer.inlineCallbacks
def _receive_docs(self, last_known_generation, last_known_trans_id,
- ensure_callback, sync_id, defer_decryption):
- defer_decryption = False
-
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
+ ensure_callback, sync_id):
new_generation = last_known_generation
new_transaction_id = last_known_trans_id
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- if defer_decryption:
- self._setup_sync_decr_pool()
-
- # ---------------------------------------------------------------------
- # maybe receive the first document
- # ---------------------------------------------------------------------
-
# we fetch the first document before fetching the rest because we need
# to know the total number of documents to be received, and this
# information comes as metadata to each request.
@@ -85,14 +69,6 @@ class HTTPDocFetcher(object):
new_generation = ngen
new_transaction_id = ntrans
- # ---------------------------------------------------------------------
- # wait for async decryption to finish
- # ---------------------------------------------------------------------
-
- if defer_decryption:
- yield self._sync_decr_pool.deferred
- self._sync_decr_pool.stop()
-
defer.returnValue([new_generation, new_transaction_id])
def _fetch_all(self, last_known_generation,
@@ -126,9 +102,6 @@ class HTTPDocFetcher(object):
new_generation, new_transaction_id, number_of_changes, entries =\
self._parse_received_doc_response(response)
- if self._sync_decr_pool and not self._sync_decr_pool.running:
- self._sync_decr_pool.start(number_of_changes)
-
for doc_id, rev, content, gen, trans_id in entries:
if doc_id is not None:
# decrypt incoming document and insert into local database
@@ -136,31 +109,10 @@ class HTTPDocFetcher(object):
# symmetric decryption of document's contents
# ---------------------------------------------------------
# If arriving content was symmetrically encrypted, we decrypt
- # it. We do it inline if defer_decryption flag is False or no
- # sync_db was defined, otherwise we defer it writing it to the
- # received docs table.
doc = SoledadDocument(doc_id, rev, content)
if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id,
- idx)
- else:
- self._insert_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
+ doc.set_json(self._crypto.decrypt_doc(doc))
+ self._insert_doc_cb(doc, gen, trans_id)
self._received_docs += 1
user_data = {'uuid': self.uuid, 'userid': self.userid}
_emit_receive_status(user_data, self._received_docs, total)
@@ -212,18 +164,6 @@ class HTTPDocFetcher(object):
return new_generation, new_transaction_id, number_of_changes, \
entries
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None and self._sync_db is not None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto,
- self._sync_db,
- insert_doc_cb=self._insert_doc_cb,
- source_replica_uid=self.source_replica_uid)
-
def _emit_receive_status(user_data, received_docs, total):
content = {'received': received_docs, 'total': total}
diff --git a/client/src/leap/soledad/client/interfaces.py b/client/src/leap/soledad/client/interfaces.py
index 14b34d24..82927ff4 100644
--- a/client/src/leap/soledad/client/interfaces.py
+++ b/client/src/leap/soledad/client/interfaces.py
@@ -321,7 +321,7 @@ class ISyncableStorage(Interface):
"Property, True if the syncer is syncing.")
token = Attribute("The authentication Token.")
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -331,11 +331,6 @@ class ISyncableStorage(Interface):
:param url: the url of the target replica to sync with
:type url: str
- :param defer_decryption:
- Whether to defer the decryption process using the intermediate
- database. If False, decryption will be done inline.
- :type defer_decryption: bool
-
:return:
A deferred that will fire with the local generation before the
synchronisation was performed.
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index b198607d..ba341bbf 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -164,7 +164,7 @@ class SQLCipherOptions(object):
:param cipher_page_size: The page size.
:type cipher_page_size: int
:param defer_encryption:
- Whether to defer encryption/decryption of documents, or do it
+ Whether to defer encryption of documents, or do it
inline while syncing.
:type defer_encryption: bool
"""
@@ -480,7 +480,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
raise DatabaseAccessError(str(e))
@defer.inlineCallbacks
- def sync(self, url, creds=None, defer_decryption=True):
+ def sync(self, url, creds=None):
"""
Synchronize documents with remote replica exposed at url.
@@ -495,10 +495,6 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:param creds: optional dictionary giving credentials to authorize the
operation with the server.
:type creds: dict
- :param defer_decryption:
- Whether to defer the decryption process using the intermediate
- database. If False, decryption will be done inline.
- :type defer_decryption: bool
:return:
A Deferred, that will fire with the local generation (type `int`)
@@ -510,8 +506,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
self.sync_phase = syncer.sync_phase
self.syncer = syncer
self.sync_exchange_phase = syncer.sync_exchange_phase
- local_gen_before_sync = yield syncer.sync(
- defer_decryption=defer_decryption)
+ local_gen_before_sync = yield syncer.sync()
self.received_docs = syncer.received_docs
defer.returnValue(local_gen_before_sync)
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index 4cbd9f2a..d3cfe029 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -56,23 +56,10 @@ class SoledadSynchronizer(Synchronizer):
self.sync_exchange_phase = None
@defer.inlineCallbacks
- def sync(self, defer_decryption=True):
+ def sync(self):
"""
Synchronize documents between source and target.
- Differently from u1db `Synchronizer.sync` method, this one allows to
- pass a `defer_decryption` flag that will postpone the last
- step in the synchronization dance, namely, the setting of the last
- known generation and transaction id for a given remote replica.
-
- This is done to allow the ongoing parallel decryption of the incoming
- docs to proceed without `InvalidGeneration` conflicts.
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
:return: A deferred which will fire after the sync has finished with
the local generation before the synchronization was performed.
:rtype: twisted.internet.defer.Deferred
@@ -172,8 +159,7 @@ class SoledadSynchronizer(Synchronizer):
new_gen, new_trans_id = yield sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
+ self._insert_doc_from_target, ensure_callback=ensure_callback)
logger.debug("target gen after sync: %d" % new_gen)
logger.debug("target trans_id after sync: %s" % new_trans_id)
if hasattr(self.source, 'commit'):
diff --git a/testing/tests/perf/test_encdecpool.py b/testing/tests/perf/test_encdecpool.py
index 77091a41..8e820b9c 100644
--- a/testing/tests/perf/test_encdecpool.py
+++ b/testing/tests/perf/test_encdecpool.py
@@ -3,7 +3,6 @@ import json
from uuid import uuid4
from twisted.internet.defer import gatherResults
from leap.soledad.client.encdecpool import SyncEncrypterPool
-from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.common.document import SoledadDocument
# FIXME: test load is low due issue #7370, higher values will get out of memory
@@ -36,43 +35,3 @@ def create_encrypt(amount, size):
test_encdecpool_encrypt_100_10k = create_encrypt(100, 10*1000)
test_encdecpool_encrypt_100_100k = create_encrypt(100, 100*1000)
test_encdecpool_encrypt_100_500k = create_encrypt(100, 500*1000)
-
-
-def create_decrypt(amount, size):
- @pytest.mark.benchmark(group="test_pool_decrypt")
- @pytest.inlineCallbacks
- def test(soledad_client, txbenchmark_with_setup, request, payload):
- DOC_CONTENT = {'payload': payload(size)}
- client = soledad_client()
-
- def setup():
- pool = SyncDecrypterPool(
- client._crypto,
- client._sync_db,
- source_replica_uid=client._dbpool.replica_uid,
- insert_doc_cb=lambda x, y, z: False) # ignored
- pool.start(amount)
- request.addfinalizer(pool.stop)
- crypto = client._crypto
- docs = []
- for _ in xrange(amount):
- doc = SoledadDocument(
- doc_id=uuid4().hex, rev='rev',
- json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc.doc_id, encrypted_content))
- return pool, docs
-
- def put_and_wait(pool, docs):
- deferreds = [] # fires on completion
- for idx, (doc_id, content) in enumerate(docs, 1):
- deferreds.append(pool.insert_encrypted_received_doc(
- doc_id, 'rev', content, idx, "trans_id", idx))
- return gatherResults(deferreds)
-
- yield txbenchmark_with_setup(setup, put_and_wait)
- return test
-
-test_encdecpool_decrypt_100_10k = create_decrypt(100, 10*1000)
-test_encdecpool_decrypt_100_100k = create_decrypt(100, 100*1000)
-test_encdecpool_decrypt_100_500k = create_decrypt(100, 500*1000)
diff --git a/testing/tests/perf/test_sync.py b/testing/tests/perf/test_sync.py
index 4d42395b..0b48a0b9 100644
--- a/testing/tests/perf/test_sync.py
+++ b/testing/tests/perf/test_sync.py
@@ -23,8 +23,7 @@ def create_upload(uploads, size):
def setup():
return load_up(client, uploads, payload(size))
- yield txbenchmark_with_setup(setup, client.sync,
- defer_decryption=False)
+ yield txbenchmark_with_setup(setup, client.sync)
return test
diff --git a/testing/tests/sync/test_encdecpool.py b/testing/tests/sync/test_encdecpool.py
index 4a32885e..7055a765 100644
--- a/testing/tests/sync/test_encdecpool.py
+++ b/testing/tests/sync/test_encdecpool.py
@@ -1,34 +1,11 @@
# -*- coding: utf-8 -*-
-# test_encdecpool.py
-# Copyright (C) 2015 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Tests for encryption and decryption pool.
-"""
import json
-from random import shuffle
-
-from mock import MagicMock
from twisted.internet.defer import inlineCallbacks
from leap.soledad.client.encdecpool import SyncEncrypterPool
-from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.common.document import SoledadDocument
from test_soledad.util import BaseSoledadTest
-from twisted.internet import defer
DOC_ID = "mydoc"
DOC_REV = "rev"
@@ -69,238 +46,3 @@ class TestSyncEncrypterPool(BaseSoledadTest):
encrypted = yield self._pool.get_encrypted_doc(DOC_ID, DOC_REV)
self.assertIsNotNone(encrypted)
-
-
-class TestSyncDecrypterPool(BaseSoledadTest):
-
- def _insert_doc_cb(self, doc, gen, trans_id):
- """
- Method used to mock the sync's return_doc_cb callback.
- """
- self._inserted_docs.append((doc, gen, trans_id))
-
- def _setup_pool(self, sync_db=None):
- sync_db = sync_db or self._soledad._sync_db
- return SyncDecrypterPool(
- self._soledad._crypto,
- sync_db,
- source_replica_uid=self._soledad._dbpool.replica_uid,
- insert_doc_cb=self._insert_doc_cb)
-
- def setUp(self):
- BaseSoledadTest.setUp(self)
- # setup the pool
- self._pool = self._setup_pool()
- # reset the inserted docs mock
- self._inserted_docs = []
-
- def tearDown(self):
- if self._pool.running:
- self._pool.stop()
- BaseSoledadTest.tearDown(self)
-
- def test_insert_received_doc(self):
- """
- Test that one document added to the pool is inserted using the
- callback.
- """
- self._pool.start(1)
- self._pool.insert_received_doc(
- DOC_ID, DOC_REV, "{}", 1, "trans_id", 1)
-
- def _assert_doc_was_inserted(_):
- self.assertEqual(
- self._inserted_docs,
- [(SoledadDocument(DOC_ID, DOC_REV, "{}"), 1, u"trans_id")])
-
- self._pool.deferred.addCallback(_assert_doc_was_inserted)
- return self._pool.deferred
-
- def test_looping_control(self):
- """
- Start and stop cleanly.
- """
- self._pool.start(10)
- self.assertTrue(self._pool.running)
- self._pool.stop()
- self.assertFalse(self._pool.running)
- self.assertTrue(self._pool.deferred.called)
-
- def test_sync_id_col_is_created_if_non_existing_in_docs_recvd_table(self):
- """
- Test that docs_received table is migrated, and has the sync_id column
- """
- mock_run_query = MagicMock(return_value=defer.succeed(None))
- mock_sync_db = MagicMock()
- mock_sync_db.runQuery = mock_run_query
- pool = self._setup_pool(mock_sync_db)
- d = pool.start(10)
- pool.stop()
-
- def assert_trial_to_create_sync_id_column(_):
- mock_run_query.assert_called_once_with(
- "ALTER TABLE docs_received ADD COLUMN sync_id")
-
- d.addCallback(assert_trial_to_create_sync_id_column)
- return d
-
- def test_insert_received_doc_many(self):
- """
- Test that many documents added to the pool are inserted using the
- callback.
- """
- many = 100
- self._pool.start(many)
-
- # insert many docs in the pool
- for i in xrange(many):
- gen = idx = i + 1
- doc_id = "doc_id: %d" % idx
- rev = "rev: %d" % idx
- content = {'idx': idx}
- trans_id = "trans_id: %d" % idx
- self._pool.insert_received_doc(
- doc_id, rev, content, gen, trans_id, idx)
-
- def _assert_doc_was_inserted(_):
- self.assertEqual(many, len(self._inserted_docs))
- idx = 1
- for doc, gen, trans_id in self._inserted_docs:
- expected_gen = idx
- expected_doc_id = "doc_id: %d" % idx
- expected_rev = "rev: %d" % idx
- expected_content = json.dumps({'idx': idx})
- expected_trans_id = "trans_id: %d" % idx
-
- self.assertEqual(expected_doc_id, doc.doc_id)
- self.assertEqual(expected_rev, doc.rev)
- self.assertEqual(expected_content, json.dumps(doc.content))
- self.assertEqual(expected_gen, gen)
- self.assertEqual(expected_trans_id, trans_id)
-
- idx += 1
-
- self._pool.deferred.addCallback(_assert_doc_was_inserted)
- return self._pool.deferred
-
- def test_insert_encrypted_received_doc(self):
- """
- Test that one encrypted document added to the pool is decrypted and
- inserted using the callback.
- """
- crypto = self._soledad._crypto
- doc = SoledadDocument(
- doc_id=DOC_ID, rev=DOC_REV, json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
-
- # insert the encrypted document in the pool
- self._pool.start(1)
- self._pool.insert_encrypted_received_doc(
- DOC_ID, DOC_REV, encrypted_content, 1, "trans_id", 1)
-
- def _assert_doc_was_decrypted_and_inserted(_):
- self.assertEqual(1, len(self._inserted_docs))
- self.assertEqual(self._inserted_docs, [(doc, 1, u"trans_id")])
-
- self._pool.deferred.addCallback(
- _assert_doc_was_decrypted_and_inserted)
- return self._pool.deferred
-
- @inlineCallbacks
- def test_processing_order(self):
- """
- This test ensures that processing of documents only occur if there is
- a sequence in place.
- """
- crypto = self._soledad._crypto
-
- docs = []
- for i in xrange(1, 10):
- i = str(i)
- doc = SoledadDocument(
- doc_id=DOC_ID + i, rev=DOC_REV + i,
- json=json.dumps(DOC_CONTENT))
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc, encrypted_content))
-
- # insert the encrypted document in the pool
- yield self._pool.start(10) # pool is expecting to process 10 docs
- self._pool._loop.stop() # we are processing manually
- # first three arrives, forming a sequence
- for i, (doc, encrypted_content) in enumerate(docs[:3]):
- gen = idx = i + 1
- yield self._pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, encrypted_content, gen, "trans_id", idx)
-
- # last one arrives alone, so it can't be processed
- doc, encrypted_content = docs[-1]
- yield self._pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, encrypted_content, 10, "trans_id", 10)
-
- yield self._pool._decrypt_and_recurse()
-
- self.assertEqual(3, self._pool._processed_docs)
-
- def test_insert_encrypted_received_doc_many(self, many=100):
- """
- Test that many encrypted documents added to the pool are decrypted and
- inserted using the callback.
- """
- crypto = self._soledad._crypto
- self._pool.start(many)
- docs = []
-
- # insert many encrypted docs in the pool
- for i in xrange(many):
- gen = idx = i + 1
- doc_id = "doc_id: %d" % idx
- rev = "rev: %d" % idx
- content = {'idx': idx}
- trans_id = "trans_id: %d" % idx
-
- doc = SoledadDocument(
- doc_id=doc_id, rev=rev, json=json.dumps(content))
-
- encrypted_content = json.loads(crypto.encrypt_doc(doc))
- docs.append((doc_id, rev, encrypted_content, gen,
- trans_id, idx))
- shuffle(docs)
-
- for doc in docs:
- self._pool.insert_encrypted_received_doc(*doc)
-
- def _assert_docs_were_decrypted_and_inserted(_):
- self.assertEqual(many, len(self._inserted_docs))
- idx = 1
- for doc, gen, trans_id in self._inserted_docs:
- expected_gen = idx
- expected_doc_id = "doc_id: %d" % idx
- expected_rev = "rev: %d" % idx
- expected_content = json.dumps({'idx': idx})
- expected_trans_id = "trans_id: %d" % idx
-
- self.assertEqual(expected_doc_id, doc.doc_id)
- self.assertEqual(expected_rev, doc.rev)
- self.assertEqual(expected_content, json.dumps(doc.content))
- self.assertEqual(expected_gen, gen)
- self.assertEqual(expected_trans_id, trans_id)
-
- idx += 1
-
- self._pool.deferred.addCallback(
- _assert_docs_were_decrypted_and_inserted)
- return self._pool.deferred
-
- @inlineCallbacks
- def test_pool_reuse(self):
- """
- The pool is reused between syncs, this test verifies that
- reusing is fine.
- """
- for i in xrange(3):
- yield self.test_insert_encrypted_received_doc_many(5)
- self._inserted_docs = []
- decrypted_docs = yield self._pool._get_docs(encrypted=False)
- # check that decrypted docs staging is clean
- self.assertEquals([], decrypted_docs)
- self._pool.stop()
diff --git a/testing/tests/sync/test_sync.py b/testing/tests/sync/test_sync.py
index 5290003e..a7d0a92b 100644
--- a/testing/tests/sync/test_sync.py
+++ b/testing/tests/sync/test_sync.py
@@ -187,7 +187,7 @@ class TestSoledadDbSync(
self.addCleanup(target.close)
return sync.SoledadSynchronizer(
self.db,
- target).sync(defer_decryption=False)
+ target).sync()
@defer.inlineCallbacks
def test_db_sync(self):
diff --git a/testing/tests/sync/test_sync_deferred.py b/testing/tests/sync/test_sync_deferred.py
index 4948aaf8..482b150c 100644
--- a/testing/tests/sync/test_sync_deferred.py
+++ b/testing/tests/sync/test_sync_deferred.py
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
-Test Leap backend bits: sync with deferred encryption/decryption.
+Test Leap backend bits: sync with deferred encryption.
"""
import time
import os
@@ -22,12 +22,8 @@ import random
import string
import shutil
-from urlparse import urljoin
-
from twisted.internet import defer
-from leap.soledad.common import couch
-
from leap.soledad.client import sync
from leap.soledad.client.sqlcipher import SQLCipherOptions
from leap.soledad.client.sqlcipher import SQLCipherDatabase
@@ -41,9 +37,6 @@ from test_soledad.util import make_soledad_app
from test_soledad.util import soledad_sync_target
-# Just to make clear how this test is different... :)
-DEFER_DECRYPTION = True
-
WAIT_STEP = 1
MAX_WAIT = 10
DBPASS = "pass"
@@ -52,7 +45,7 @@ DBPASS = "pass"
class BaseSoledadDeferredEncTest(SoledadWithCouchServerMixin):
"""
- Another base class for testing the deferred encryption/decryption during
+ Another base class for testing the deferred encryption during
the syncs, using the intermediate database.
"""
defer_sync_encryption = True
@@ -109,7 +102,7 @@ class TestSoledadDbSyncDeferredEncDecr(
"""
Test db.sync remote sync shortcut.
- Case with deferred encryption and decryption: using the intermediate
+ Case with deferred encryption: using the intermediate
syncdb.
"""
@@ -158,7 +151,7 @@ class TestSoledadDbSyncDeferredEncDecr(
self.addCleanup(target.close)
return sync.SoledadSynchronizer(
dbsyncer,
- target).sync(defer_decryption=True)
+ target).sync()
def wait_for_sync(self):
"""
diff --git a/testing/tests/sync/test_sync_mutex.py b/testing/tests/sync/test_sync_mutex.py
index 2626ab2a..2bcb3aec 100644
--- a/testing/tests/sync/test_sync_mutex.py
+++ b/testing/tests/sync/test_sync_mutex.py
@@ -47,7 +47,7 @@ from test_soledad.util import soledad_sync_target
_old_sync = SoledadSynchronizer.sync
-def _timed_sync(self, defer_decryption=True):
+def _timed_sync(self):
t = time.time()
sync_id = uuid.uuid4()
@@ -62,7 +62,7 @@ def _timed_sync(self, defer_decryption=True):
self.source.sync_times[sync_id]['end'] = t
return passthrough
- d = _old_sync(self, defer_decryption=defer_decryption)
+ d = _old_sync(self)
d.addBoth(_store_finish_time)
return d
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index 964468ce..a2935539 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -231,8 +231,7 @@ class TestSoledadSyncTarget(
doc = self.make_document('doc-here', 'replica:1', '{"value": "here"}')
new_gen, trans_id = yield remote_target.sync_exchange(
[(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=receive_doc)
self.assertEqual(1, new_gen)
self.assertGetEncryptedDoc(
db, 'doc-here', 'replica:1', '{"value": "here"}', False)
@@ -285,8 +284,7 @@ class TestSoledadSyncTarget(
'replica',
last_known_generation=0,
last_known_trans_id=None,
- insert_doc_cb=receive_doc,
- defer_decryption=False)
+ insert_doc_cb=receive_doc)
self.assertGetEncryptedDoc(
db, 'doc-here', 'replica:1', '{"value": "here"}',
@@ -298,8 +296,7 @@ class TestSoledadSyncTarget(
trigger_ids = []
new_gen, trans_id = yield remote_target.sync_exchange(
[(doc2, 11, 'T-sud')], 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=receive_doc)
self.assertGetEncryptedDoc(
db, 'doc-here2', 'replica:1', '{"value": "here2"}',
False)
@@ -331,7 +328,7 @@ class TestSoledadSyncTarget(
new_gen, trans_id = yield remote_target.sync_exchange(
[(doc, 10, 'T-sid')], 'replica', last_known_generation=0,
last_known_trans_id=None, insert_doc_cb=receive_doc,
- ensure_callback=ensure_cb, defer_decryption=False)
+ ensure_callback=ensure_cb)
self.assertEqual(1, new_gen)
db = self.db2
self.assertEqual(1, len(replica_uid_box))
@@ -446,8 +443,7 @@ class SoledadDatabaseSyncTargetTests(
'T-sid')]
new_gen, trans_id = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertGetEncryptedDoc(
self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
self.assertTransactionLog(['doc-id'], self.db)
@@ -471,8 +467,7 @@ class SoledadDatabaseSyncTargetTests(
'doc-id2', 'replica:1', tests.nested_doc), 11, 'T-2')]
new_gen, trans_id = yield self.st.sync_exchange(
docs_by_gen, 'replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertGetEncryptedDoc(
self.db, 'doc-id', 'replica:1', tests.simple_doc, False)
self.assertGetEncryptedDoc(
@@ -498,8 +493,7 @@ class SoledadDatabaseSyncTargetTests(
self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
new_gen, _ = yield self.st.sync_exchange(
[], 'other-replica', last_known_generation=0,
- last_known_trans_id=None, insert_doc_cb=self.receive_doc,
- defer_decryption=False)
+ last_known_trans_id=None, insert_doc_cb=self.receive_doc)
self.assertTransactionLog([doc.doc_id, doc2.doc_id], self.db)
self.assertEqual(2, new_gen)
self.assertEqual(
@@ -779,10 +773,6 @@ class SoledadDatabaseSyncTargetTests(
yield self.st.record_sync_info('replica', 0, 'T-sid')
self.assertEqual(expected, called)
-
-# Just to make clear how this test is different... :)
-DEFER_DECRYPTION = False
-
WAIT_STEP = 1
MAX_WAIT = 10
DBPASS = "pass"
@@ -890,8 +880,7 @@ class TestSoledadDbSync(
defer_encryption=True)
self.dbsyncer = dbsyncer
return dbsyncer.sync(target_url,
- creds=creds,
- defer_decryption=DEFER_DECRYPTION)
+ creds=creds)
else:
return self._do_sync(self, target_name)