summaryrefslogtreecommitdiff
path: root/client/src/leap
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-05-07 14:49:40 -0300
committerdrebs <drebs@leap.se>2015-05-20 10:16:46 -0300
commiteae4468d99029006cc36a021e82350a0f62f7006 (patch)
treedeb6b03134dbb6e7f61f4c339e4d621be885131e /client/src/leap
parent3a7ddacd06fd57afb10cc3d7083c2aa196c9328f (diff)
[bug] fix order of insertion of decrypted docs
This commit actually does some different things: * When doing asynchronous decryption of incoming documents in soledad client during a sync, there was the possibility that a document corresponding to a newer generation would be decrypted and inserted in the local database before a document corresponding to an older generation. When this happened, the metadata about the target database (i.e. its locally-known generation) would be first updated to the newer generation, and then an attempt to insert a document corresponding to an older generation would cause the infamous InvalidGeneration error. To fix that we use the sync-index information that is contained in the sync stream to correctly find the insertable docs to be inserted in the local database, thus avoiding the problem described above. * Refactor the sync encrypt/decrypt pool to its own file. * Fix the use of twisted adbapi with multiprocessing. Closes: #6757.
Diffstat (limited to 'client/src/leap')
-rw-r--r--client/src/leap/soledad/client/crypto.py552
-rw-r--r--client/src/leap/soledad/client/encdecpool.py673
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py8
-rw-r--r--client/src/leap/soledad/client/target.py64
4 files changed, 700 insertions, 597 deletions
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index dd40b198..bdbaa8e0 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -23,21 +23,13 @@ import hmac
import hashlib
import json
import logging
-import multiprocessing
-import threading
-import time
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
-from zope.proxy import sameProxiedObjects
-
-from twisted.internet import defer
-from twisted.internet.threads import deferToThread
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
from leap.soledad.common import crypto
-from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
@@ -515,547 +507,3 @@ def is_symmetrically_encrypted(doc):
== crypto.EncryptionSchemes.SYMKEY:
return True
return False
-
-
-#
-# Encrypt/decrypt pools of workers
-#
-
-class SyncEncryptDecryptPool(object):
- """
- Base class for encrypter/decrypter pools.
- """
- WORKERS = multiprocessing.cpu_count()
-
- def __init__(self, crypto, sync_db):
- """
- Initialize the pool of encryption-workers.
-
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
-
- :param sync_db: A database connection handle
- :type sync_db: pysqlcipher.dbapi2.Connection
-
- :param write_lock: a write lock for controlling concurrent access
- to the sync_db
- :type write_lock: threading.Lock
- """
- self._pool = multiprocessing.Pool(self.WORKERS)
- self._crypto = crypto
- self._sync_db = sync_db
-
- def close(self):
- """
- Cleanly close the pool of workers.
- """
- logger.debug("Closing %s" % (self.__class__.__name__,))
- self._pool.close()
- try:
- self._pool.join()
- except Exception:
- pass
-
- def terminate(self):
- """
- Terminate the pool of workers.
- """
- logger.debug("Terminating %s" % (self.__class__.__name__,))
- self._pool.terminate()
-
-
-def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
- """
- Encrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- encrypted_content = encrypt_docstr(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, encrypted_content
-
-
-class SyncEncrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric encryption
- of documents to be synced.
- """
- # TODO implement throttling to reduce cpu usage??
- WORKERS = multiprocessing.cpu_count()
- TABLE_NAME = "docs_tosync"
- FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
-
- def encrypt_doc(self, doc, workers=True):
- """
- Symmetrically encrypt a document.
-
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- soledad_assert(self._crypto is not None, "need a crypto object")
- docstr = doc.get_json()
- key = self._crypto.doc_passphrase(doc.doc_id)
- secret = self._crypto.secret
- args = doc.doc_id, doc.rev, docstr, key, secret
-
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
- res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
-
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
- """
- Insert results of encryption routine into the local sync database.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, doc_rev, content = result
- return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
-
- @defer.inlineCallbacks
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
- """
- Insert the contents of the encrypted doc into the local sync
- database.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param content: The encrypted document.
- :type content: str
- """
- # FIXME --- callback should complete immediately since otherwise the
- # thread which handles the results will get blocked
- # Right now we're blocking the dispatcher with the writes to sqlite.
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
- % (self.TABLE_NAME,)
- yield self._sync_db.runQuery(query, (doc_id, doc_rev, content))
-
-
-def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
- """
- Decrypt the content of the given document.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The encrypted content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification of
- that document.
- :type trans_id: str
- :param key: The encryption key.
- :type key: str
- :param secret: The Soledad storage secret (used for MAC auth).
- :type secret: str
-
- :return: A tuple containing the doc id, revision and encrypted content.
- :rtype: tuple(str, str, str)
- """
- decrypted_content = decrypt_doc_dict(
- content, doc_id, doc_rev, key, secret)
- return doc_id, doc_rev, decrypted_content, gen, trans_id
-
-
-class SyncDecrypterPool(SyncEncryptDecryptPool):
- """
- Pool of workers that spawn subprocesses to execute the symmetric decryption
- of documents that were received.
-
- The decryption of the received documents is done in two steps:
-
- 1. All the encrypted docs are collected, together with their generation
- and transaction-id
- 2. The docs are enqueued for decryption. When completed, they are
- inserted following the generation order.
- """
- # TODO implement throttling to reduce cpu usage??
- TABLE_NAME = "docs_received"
- FIELD_NAMES = "doc_id PRIMARY_KEY, rev, content, gen, trans_id, encrypted"
-
- """
- Period of recurrence of the periodic decrypting task, in seconds.
- """
- DECRYPT_LOOP_PERIOD = 0.5
-
- def __init__(self, *args, **kwargs):
- """
- Initialize the decrypter pool, and setup a dict for putting the
- results of the decrypted docs until they are picked by the insert
- routine that gets them in order.
-
- :param insert_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type insert_doc_cb: function
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- """
- self._insert_doc_cb = kwargs.pop("insert_doc_cb")
- self.source_replica_uid = kwargs.pop("source_replica_uid")
- SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
- self._async_results = []
-
- self._stopped = threading.Event()
- self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
- self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decryptor thread."))
-
- @defer.inlineCallbacks
- def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert a received message with encrypted content, to be decrypted later
- on.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- docstr = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(
- query,
- (doc_id, doc_rev, docstr, gen, trans_id, 1))
-
- @defer.inlineCallbacks
- def insert_received_doc(self, doc_id, doc_rev, content, gen, trans_id):
- """
- Insert a document that is not symmetrically encrypted.
- We store it in the staging area (the decrypted_docs dictionary) to be
- picked up in order as the preceding documents are decrypted.
-
- :param doc_id: The Document ID.
- :type doc_id: str
- :param doc_rev: The Document Revision
- :param doc_rev: str
- :param content: the Content of the document
- :type content: str
- :param gen: the Document Generation
- :type gen: int
- :param trans_id: Transaction ID
- :type trans_id: str
- """
- if not isinstance(content, str):
- content = json.dumps(content)
- query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?)" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(
- query,
- (doc_id, doc_rev, content, gen, trans_id, 0))
-
- @defer.inlineCallbacks
- def delete_received_doc(self, doc_id, doc_rev):
- """
- Delete a received doc after it was inserted into the local db.
-
- :param doc_id: Document ID.
- :type doc_id: str
- :param doc_rev: Document revision.
- :type doc_rev: str
- """
- sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
- self.TABLE_NAME,)
- yield self._sync_db.runQuery(sql_del, (doc_id, doc_rev))
-
- def _decrypt_doc(self, doc_id, rev, content, gen, trans_id,
- source_replica_uid, workers=True):
- """
- Symmetrically decrypt a document.
-
- :param doc_id: The ID for the document with contents to be encrypted.
- :type doc: str
- :param rev: The revision of the document.
- :type rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- :param source_replica_uid:
- :type source_replica_uid: str
-
- :param workers: Whether to defer the decryption to the multiprocess
- pool of workers. Useful for debugging purposes.
- :type workers: bool
- """
- self.source_replica_uid = source_replica_uid
-
- # insert_doc_cb is a proxy object that gets updated with the right
- # insert function only when the sync_target invokes the sync_exchange
- # method. so, if we don't still have a non-empty callback, we refuse
- # to proceed.
- if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
- None):
- logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
- return
-
- soledad_assert(self._crypto is not None, "need a crypto object")
-
- if len(content) == 0:
- # not encrypted payload
- return
-
- content = json.loads(content)
- key = self._crypto.doc_passphrase(doc_id)
- secret = self._crypto.secret
- args = doc_id, rev, content, gen, trans_id, key, secret
-
- if workers:
- # save the async result object so we can inspect it for failures
- self._async_results.append(self._pool.apply_async(
- decrypt_doc_task, args,
- callback=self._decrypt_doc_cb))
- else:
- # decrypt inline
- res = decrypt_doc_task(*args)
- self._decrypt_doc_cb(res)
-
- def _decrypt_doc_cb(self, result):
- """
- Store the decryption result in the sync db from where it will later be
- picked by _process_decrypted.
-
- :param result: A tuple containing the doc id, revision and encrypted
- content.
- :type result: tuple(str, str, str)
- """
- doc_id, rev, content, gen, trans_id = result
- logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
- % (doc_id, rev, gen, trans_id))
- return self.insert_received_doc(doc_id, rev, content, gen, trans_id)
-
- def get_docs_by_generation(self, encrypted=None):
- """
- Get all documents in the received table from the sync db,
- ordered by generation.
-
- :param encrypted: If not None, only return documents with encrypted
- field equal to given parameter.
- :type encrypted: bool or None
-
- :return: list of doc_id, rev, generation, gen, trans_id
- :rtype: list
- """
- sql = "SELECT doc_id, rev, content, gen, trans_id, encrypted FROM %s" \
- % self.TABLE_NAME
- if encrypted is not None:
- sql += " WHERE encrypted = %d" % int(encrypted)
- sql += " ORDER BY gen ASC"
- return self._fetchall(sql)
-
- @defer.inlineCallbacks
- def get_insertable_docs_by_gen(self):
- """
- Return a list of non-encrypted documents ready to be inserted.
- """
- # here, we compare the list of all available docs with the list of
- # decrypted docs and find the longest common prefix between these two
- # lists. Note that the order of lists fetch matters: if instead we
- # first fetch the list of decrypted docs and then the list of all
- # docs, then some document might have been decrypted between these two
- # calls, and if it is just the right doc then it might not be caught
- # by the next loop.
- all_docs = yield self.get_docs_by_generation()
- decrypted_docs = yield self.get_docs_by_generation(encrypted=False)
- insertable = []
- for doc_id, rev, _, gen, trans_id, encrypted in all_docs:
- for next_doc_id, _, next_content, _, _, _ in decrypted_docs:
- if doc_id == next_doc_id:
- content = next_content
- insertable.append((doc_id, rev, content, gen, trans_id))
- else:
- break
- defer.returnValue(insertable)
-
- @defer.inlineCallbacks
- def _count_docs_in_sync_db(self, encrypted=None):
- """
- Count how many documents we have in the table for received docs.
-
- :param encrypted: If not None, return count of documents with
- encrypted field equal to given parameter.
- :type encrypted: bool or None
-
- :return: The count of documents.
- :rtype: int
- """
- query = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
- if encrypted is not None:
- query += " WHERE encrypted = %d" % int(encrypted)
- res = yield self._sync_db.runQuery(query)
- if res:
- val = res.pop()
- defer.returnValue(val[0])
- else:
- defer.returnValue(0)
-
- @defer.inlineCallbacks
- def _decrypt_received_docs(self):
- """
- Get all the encrypted documents from the sync database and dispatch a
- decrypt worker to decrypt each one of them.
- """
- self._raise_in_case_of_failed_async_calls()
- docs_by_generation = yield self.get_docs_by_generation(encrypted=True)
- for doc_id, rev, content, gen, trans_id, _ in docs_by_generation:
- self._decrypt_doc(
- doc_id, rev, content, gen, trans_id, self.source_replica_uid)
-
- @defer.inlineCallbacks
- def _process_decrypted(self):
- """
- Process the already decrypted documents, and insert as many documents
- as can be taken from the expected order without finding a gap.
-
- :return: Whether we have processed all the pending docs.
- :rtype: bool
- """
- # Acquire the lock to avoid processing while we're still
- # getting data from the syncing stream, to avoid InvalidGeneration
- # problems.
- insertable = yield self.get_insertable_docs_by_gen()
- for doc_fields in insertable:
- yield self.insert_decrypted_local_doc(*doc_fields)
-
- @defer.inlineCallbacks
- def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
- gen, trans_id):
- """
- Insert the decrypted document into the local sqlcipher database.
- Makes use of the passed callback `return_doc_cb` passed to the caller
- by u1db sync.
-
- :param doc_id: The document id.
- :type doc_id: str
- :param doc_rev: The document revision.
- :type doc_rev: str
- :param content: The serialized content of the document.
- :type content: str
- :param gen: The generation corresponding to the modification of that
- document.
- :type gen: int
- :param trans_id: The transaction id corresponding to the modification
- of that document.
- :type trans_id: str
- """
- # could pass source_replica in params for callback chain
- insert_fun = self._insert_doc_cb[self.source_replica_uid]
- logger.debug("Sync decrypter pool: inserting doc in local db: "
- "%s:%s %s" % (doc_id, doc_rev, gen))
-
- # convert deleted documents to avoid error on document creation
- if content == 'null':
- content = None
- doc = SoledadDocument(doc_id, doc_rev, content)
- gen = int(gen)
- insert_fun(doc, gen, trans_id)
-
- # If no errors found, remove it from the received database.
- yield self.delete_received_doc(doc_id, doc_rev)
-
- @defer.inlineCallbacks
- def empty(self):
- """
- Empty the received docs table of the sync database.
- """
- sql = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- yield self._sync_db.runQuery(sql)
-
- @defer.inlineCallbacks
- def _fetchall(self, *args, **kwargs):
- results = yield self._sync_db.runQuery(*args, **kwargs)
- defer.returnValue(results)
-
- def _raise_in_case_of_failed_async_calls(self):
- """
- Re-raise any exception raised by an async call.
-
- :raise Exception: Raised if an async call has raised an exception.
- """
- for res in self._async_results:
- if res.ready():
- if not res.successful():
- # re-raise the exception raised by the remote call
- res.get()
-
- def _stop_decr_loop(self):
- """
- """
- self._stopped.set()
-
- def close(self):
- """
- """
- self._stop_decr_loop()
- SyncEncryptDecryptPool.close(self)
-
- def _decrypt_and_process_docs(self):
- """
- Decrypt the documents received from remote replica and insert them
- into the local one.
-
- Called periodically from LoopingCall self._sync_loop.
- """
- while not self._stopped.is_set():
- if sameProxiedObjects(
- self._insert_doc_cb.get(self.source_replica_uid),
- None):
- continue
- self._decrypt_received_docs()
- self._process_decrypted()
- time.sleep(self.DECRYPT_LOOP_PERIOD)
-
- def wait(self):
- while not self.clear_to_sync():
- time.sleep(self.DECRYPT_LOOP_PERIOD)
-
- @defer.inlineCallbacks
- def clear_to_sync(self):
- count = yield self._count_docs_in_sync_db()
- defer.returnValue(count == 0)
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
new file mode 100644
index 00000000..0466ec5d
--- /dev/null
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -0,0 +1,673 @@
+# -*- coding: utf-8 -*-
+# encdecpool.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A pool of encryption/decryption concurrent and parallel workers for using
+during synchronization.
+"""
+
+
+import multiprocessing
+import threading
+import time
+import json
+import logging
+
+from zope.proxy import sameProxiedObjects
+
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+
+from leap.soledad.common.document import SoledadDocument
+from leap.soledad.common import soledad_assert
+
+from leap.soledad.client.crypto import encrypt_docstr
+from leap.soledad.client.crypto import decrypt_doc_dict
+
+
+logger = logging.getLogger(__name__)
+
+
+#
+# Encrypt/decrypt pools of workers
+#
+
+class SyncEncryptDecryptPool(object):
+ """
+ Base class for encrypter/decrypter pools.
+ """
+ WORKERS = multiprocessing.cpu_count()
+
+ def __init__(self, crypto, sync_db):
+ """
+ Initialize the pool of encryption-workers.
+
+ :param crypto: A SoledadCryto instance to perform the encryption.
+ :type crypto: leap.soledad.crypto.SoledadCrypto
+
+ :param sync_db: A database connection handle
+ :type sync_db: pysqlcipher.dbapi2.Connection
+ """
+ self._pool = multiprocessing.Pool(self.WORKERS)
+ self._crypto = crypto
+ self._sync_db = sync_db
+
+ def close(self):
+ """
+ Cleanly close the pool of workers.
+ """
+ logger.debug("Closing %s" % (self.__class__.__name__,))
+ self._pool.close()
+ try:
+ self._pool.join()
+ except Exception:
+ pass
+
+ def terminate(self):
+ """
+ Terminate the pool of workers.
+ """
+ logger.debug("Terminating %s" % (self.__class__.__name__,))
+ self._pool.terminate()
+
+
+def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
+ """
+ Encrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ encrypted_content = encrypt_docstr(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, encrypted_content
+
+
+class SyncEncrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric encryption
+ of documents to be synced.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ WORKERS = multiprocessing.cpu_count()
+ TABLE_NAME = "docs_tosync"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
+
+ def encrypt_doc(self, doc, workers=True):
+ """
+ Symmetrically encrypt a document.
+
+ :param doc: The document with contents to be encrypted.
+ :type doc: SoledadDocument
+
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+ """
+ soledad_assert(self._crypto is not None, "need a crypto object")
+ docstr = doc.get_json()
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ args = doc.doc_id, doc.rev, docstr, key, secret
+
+ try:
+ if workers:
+ res = self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self.encrypt_doc_cb)
+ else:
+ # encrypt inline
+ res = encrypt_doc_task(*args)
+ self.encrypt_doc_cb(res)
+
+ except Exception as exc:
+ logger.exception(exc)
+
+ def encrypt_doc_cb(self, result):
+ """
+ Insert results of encryption routine into the local sync database.
+
+ :param result: A tuple containing the doc id, revision and encrypted
+ content.
+ :type result: tuple(str, str, str)
+ """
+ doc_id, doc_rev, content = result
+ return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ """
+ Insert the contents of the encrypted doc into the local sync
+ database.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ """
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
+ % (self.TABLE_NAME,)
+ return self._sync_db.runOperation(query, (doc_id, doc_rev, content))
+
+
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
+ idx):
+ """
+ Decrypt the content of the given document.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The encrypted content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification of
+ that document.
+ :type trans_id: str
+ :param key: The encryption key.
+ :type key: str
+ :param secret: The Soledad storage secret (used for MAC auth).
+ :type secret: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A tuple containing the doc id, revision and encrypted content.
+ :rtype: tuple(str, str, str)
+ """
+ decrypted_content = decrypt_doc_dict(content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id, idx
+
+
+class SyncDecrypterPool(SyncEncryptDecryptPool):
+ """
+ Pool of workers that spawn subprocesses to execute the symmetric decryption
+ of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. Encrypted documents are stored in the sync db by the actual soledad
+ sync loop.
+ 2. The soledad sync loop tells us how many documents we should expect
+ to process.
+ 3. We start a decrypt-and-process loop:
+
+ a. Encrypted documents are fetched.
+ b. Encrypted documents are decrypted.
+ c. The longest possible list of decrypted documents are inserted
+ in the soledad db (this depends on which documents have already
+ arrived and which documents have already been decrypte, because
+ the order of insertion in the local soledad db matters).
+ d. Processed documents are deleted from the database.
+
+ 4. When we have processed as many documents as we should, the loop
+ finishes.
+ """
+ # TODO implement throttling to reduce cpu usage??
+ TABLE_NAME = "docs_received"
+ FIELD_NAMES = "doc_id PRIMARY KEY, rev, content, gen, " \
+ "trans_id, encrypted, idx"
+
+ """
+ Period of recurrence of the periodic decrypting task, in seconds.
+ """
+ DECRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+
+ :param insert_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type insert_doc_cb: function
+ :param source_replica_uid: The source replica uid, used to find the
+ correct callback for inserting documents.
+ :type source_replica_uid: str
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ self.source_replica_uid = kwargs.pop("source_replica_uid")
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._last_inserted_idx = 0
+ self._docs_to_process = None
+ self._processed_docs = 0
+
+ self._async_results = []
+ self._exception = None
+ self._finished = threading.Event()
+
+ # clear the database before starting the sync
+ self._empty_db = threading.Event()
+ d = self._empty()
+ d.addCallback(lambda _: self._empty_db.set())
+
+ # start the decryption loop
+ self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished decryptor thread."))
+
+ def set_docs_to_process(self, docs_to_process):
+ """
+ Set the number of documents we expect to process.
+
+ This should be called by the during the sync exchange process as soon
+ as we know how many documents are arriving from the server.
+
+ :param docs_to_process: The number of documents to process.
+ :type docs_to_process: int
+ """
+ self._docs_to_process = docs_to_process
+
+ def insert_encrypted_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ docstr = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(
+ query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
+
+ def insert_received_doc(
+ self, doc_id, doc_rev, content, gen, trans_id, idx):
+ """
+ Insert a document that is not symmetrically encrypted.
+ We store it in the staging area (the decrypted_docs dictionary) to be
+ picked up in order as the preceding documents are decrypted.
+
+ :param doc_id: The Document ID.
+ :type doc_id: str
+ :param doc_rev: The Document Revision
+ :param doc_rev: str
+ :param content: the Content of the document
+ :type content: str
+ :param gen: the Document Generation
+ :type gen: int
+ :param trans_id: Transaction ID
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if not isinstance(content, str):
+ content = json.dumps(content)
+ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(
+ query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
+
+ def _delete_received_doc(self, doc_id):
+ """
+ Delete a received doc after it was inserted into the local db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM '%s' WHERE doc_id=?" \
+ % self.TABLE_NAME
+ return self._sync_db.runOperation(query, (doc_id,))
+
+ def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
+ workers=True):
+ """
+ Symmetrically decrypt a document and store in the sync db.
+
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ :param idx: The index of this document in the current sync process.
+ :type idx: int
+ :param workers: Whether to defer the decryption to the multiprocess
+ pool of workers. Useful for debugging purposes.
+ :type workers: bool
+
+ :return: A deferred that will fire after the document hasa been
+ decrypted and inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # insert_doc_cb is a proxy object that gets updated with the right
+ # insert function only when the sync_target invokes the sync_exchange
+ # method. so, if we don't still have a non-empty callback, we refuse
+ # to proceed.
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ logger.debug("Sync decrypter pool: no insert_doc_cb() yet.")
+ return
+
+ soledad_assert(self._crypto is not None, "need a crypto object")
+
+ content = json.loads(content)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+ args = doc_id, rev, content, gen, trans_id, key, secret, idx
+
+ if workers:
+ # when using multiprocessing, we need to wait for all parallel
+ # processing to finish before continuing with the
+ # decrypt-and-process loop. We do this by using an extra deferred
+ # that will be fired by the multiprocessing callback when it has
+ # finished processing.
+ d1 = defer.Deferred()
+
+ def _multiprocessing_callback(result):
+ d2 = self._decrypt_doc_cb(result)
+ d2.addCallback(lambda defres: d1.callback(defres))
+
+ # save the async result object so we can inspect it for failures
+ self._async_results.append(
+ self._pool.apply_async(
+ decrypt_doc_task, args,
+ callback=_multiprocessing_callback))
+
+ return d1
+ else:
+ # decrypt inline
+ res = decrypt_doc_task(*args)
+ return self._decrypt_doc_cb(res)
+
+ def _decrypt_doc_cb(self, result):
+ """
+ Store the decryption result in the sync db from where it will later be
+ picked by _process_decrypted.
+
+ :param result: A tuple containing the document's id, revision,
+ content, generation, transaction id and sync index.
+ :type result: tuple(str, str, str, int, str, int)
+
+ :return: A deferred that will fire after the document has been
+ inserted in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ doc_id, rev, content, gen, trans_id, idx = result
+ logger.debug("Sync decrypter pool: decrypted doc %s: %s %s %s"
+ % (doc_id, rev, gen, trans_id))
+ return self.insert_received_doc(
+ doc_id, rev, content, gen, trans_id, idx)
+
+ def _get_docs(self, encrypted=None, order_by='idx', order='ASC'):
+ """
+ Get documents from the received docs table in the sync db.
+
+ :param encrypted: If not None, only return documents with encrypted
+ field equal to given parameter.
+ :type encrypted: bool or None
+ :param order_by: The name of the field to order results.
+ :type order_by: str
+ :param order: Whether the order should be ASC or DESC.
+ :type order: str
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "SELECT doc_id, rev, content, gen, trans_id, encrypted, " \
+ "idx FROM %s" % self.TABLE_NAME
+ if encrypted is not None:
+ query += " WHERE encrypted = %d" % int(encrypted)
+ query += " ORDER BY %s %s" % (order_by, order)
+ return self._sync_db.runQuery(query)
+
+ @defer.inlineCallbacks
+ def _get_insertable_docs(self):
+ """
+ Return a list of non-encrypted documents ready to be inserted.
+
+ :return: A deferred that will fire with the list of insertable
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ # here, we fetch the list of decrypted documents and compare with the
+ # index of the last succesfully processed document.
+ decrypted_docs = yield self._get_docs(encrypted=False)
+ insertable = []
+ last_idx = self._last_inserted_idx
+ for doc_id, rev, content, gen, trans_id, encrypted, idx in \
+ decrypted_docs:
+ # XXX for some reason, a document might not have been deleted from
+ # the database. This is a bug. In this point, already
+ # processed documents should have been removed from the sync
+ # database and we should not have to skip them here. We need
+ # to find out why this is happening, fix, and remove the
+ # skipping below.
+ if (idx < last_idx + 1):
+ continue
+ if (idx != last_idx + 1):
+ break
+ insertable.append((doc_id, rev, content, gen, trans_id, idx))
+ last_idx += 1
+ defer.returnValue(insertable)
+
+ def _decrypt_received_docs(self):
+ """
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+
+ :return: A deferred that will fire after all documents have been
+ decrypted and inserted back in the sync db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def _callback(received_docs):
+ deferreds = []
+ for doc_id, rev, content, gen, trans_id, _, idx in received_docs:
+ deferreds.append(
+ self._decrypt_doc(
+ doc_id, rev, content, gen, trans_id, idx))
+ return defer.gatherResults(deferreds)
+
+ d = self._get_docs(encrypted=True)
+ d.addCallback(_callback)
+ return d
+
+ def _process_decrypted(self):
+ """
+ Fetch as many decrypted documents as can be taken from the expected
+ order and insert them in the database.
+
+ :return: A deferred that will fire with the list of inserted
+ documents.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ def _callback(insertable):
+ for doc_fields in insertable:
+ self._insert_decrypted_local_doc(*doc_fields)
+ return insertable
+
+ d = self._get_insertable_docs()
+ d.addCallback(_callback)
+ return d
+
+ def _delete_processed_docs(self, inserted):
+ """
+ Delete from the sync db documents that have been processed.
+
+ :param inserted: List of documents inserted in the previous process
+ step.
+ :type inserted: list
+
+ :return: A list of deferreds that will fire when each operation in the
+ database has finished.
+ :rtype: twisted.internet.defer.DeferredList
+ """
+ deferreds = []
+ for doc_id, doc_rev, _, _, _, _ in inserted:
+ deferreds.append(
+ self._delete_received_doc(doc_id))
+ if not deferreds:
+ return defer.succeed(None)
+ return defer.gatherResults(deferreds)
+
+ def _insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id, idx):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+
+ :param doc_id: The document id.
+ :type doc_id: str
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+ :param content: The serialized content of the document.
+ :type content: str
+ :param gen: The generation corresponding to the modification of that
+ document.
+ :type gen: int
+ :param trans_id: The transaction id corresponding to the modification
+ of that document.
+ :type trans_id: str
+ """
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ logger.debug("Sync decrypter pool: inserting doc in local db: "
+ "%s:%s %s" % (doc_id, doc_rev, gen))
+
+ # convert deleted documents to avoid error on document creation
+ if content == 'null':
+ content = None
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ gen = int(gen)
+ insert_fun(doc, gen, trans_id)
+
+ # store info about processed docs
+ self._last_inserted_idx = idx
+ self._processed_docs += 1
+
+ def _empty(self):
+ """
+ Empty the received docs table of the sync database.
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
+ return self._sync_db.runOperation(query)
+
+ def _raise_if_async_fails(self):
+ """
+ Raise any exception raised by a multiprocessing async decryption
+ call.
+
+ :raise Exception: Raised if an async call has raised an exception.
+ """
+ for res in self._async_results:
+ if res.ready():
+ if not res.successful():
+ # re-raise the exception raised by the remote call
+ res.get()
+
+ def _decrypt_and_process_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ This method runs in its own thread, so sleeping will not interfere
+ with the main thread.
+ """
+ try:
+ # wait for database to be emptied
+ self._empty_db.wait()
+ # wait until we know how many documents we need to process
+ while self._docs_to_process is None:
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+ # because all database operations are asynchronous, we use an event to
+ # make sure we don't start the next loop before the current one has
+ # finished.
+ event = threading.Event()
+ # loop until we have processes as many docs as the number of changes
+ while self._processed_docs < self._docs_to_process:
+ if sameProxiedObjects(
+ self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ continue
+ event.clear()
+ d = self._decrypt_received_docs()
+ d.addCallback(lambda _: self._raise_if_async_fails())
+ d.addCallback(lambda _: self._process_decrypted())
+ d.addCallback(self._delete_processed_docs)
+ d.addCallback(lambda _: event.set())
+ event.wait()
+ # sleep a bit to give time for some decryption work
+ time.sleep(self.DECRYPT_LOOP_PERIOD)
+ except Exception as e:
+ self._exception = e
+ self._finished.set()
+
+ def wait(self):
+ """
+ Wait for the decrypt-and-process loop to finish.
+ """
+ self._finished.wait()
+ if self._exception:
+ raise self._exception
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 4f7ecd1b..d3b3d01b 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -66,7 +66,7 @@ from twisted.python.threadpool import ThreadPool
from twisted.python import log
from twisted.enterprise import adbapi
-from leap.soledad.client import crypto
+from leap.soledad.client import encdecpool
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.client.sync import SoledadSynchronizer
@@ -489,7 +489,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if defer_encryption:
# initialize syncing queue encryption pool
- self._sync_enc_pool = crypto.SyncEncrypterPool(
+ self._sync_enc_pool = encdecpool.SyncEncrypterPool(
self._crypto, self._sync_db)
# -----------------------------------------------------------------
@@ -578,8 +578,8 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
:rtype: tuple of strings
"""
maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
- encr = crypto.SyncEncrypterPool
- decr = crypto.SyncDecrypterPool
+ encr = encdecpool.SyncEncrypterPool
+ decr = encdecpool.SyncDecrypterPool
sql_encr_table_query = (maybe_create % (
encr.TABLE_NAME, encr.FIELD_NAMES))
sql_decr_table_query = (maybe_create % (
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 06cef1ee..17ce718f 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -43,7 +43,8 @@ from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
+from leap.soledad.client.encdecpool import SyncEncrypterPool
+from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import signal
@@ -787,9 +788,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._crypto = crypto
self._stopped = True
self._stop_lock = threading.Lock()
- self._sync_exchange_lock = threading.Lock()
self.source_replica_uid = source_replica_uid
- self._defer_decryption = False
self._syncer_pool = None
# deferred decryption attributes
@@ -813,9 +812,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Tear down the SyncDecrypterPool.
"""
- if self._sync_decr_pool is not None:
- self._sync_decr_pool.close()
- self._sync_decr_pool = None
+ self._sync_decr_pool.close()
+ self._sync_decr_pool = None
def _get_replica_uid(self, url):
"""
@@ -903,7 +901,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
doc = SoledadDocument(doc_id, rev, content)
if is_symmetrically_encrypted(doc):
if self._queue_for_decrypt:
- self._save_encrypted_received_doc(
+ self._enqueue_encrypted_received_doc(
doc, gen, trans_id, idx, total)
else:
# defer_decryption is False or no-sync-db fallback
@@ -913,7 +911,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# not symmetrically encrypted doc, insert it directly
# or save it in the decrypted stage.
if self._queue_for_decrypt:
- self._save_received_doc(doc, gen, trans_id, idx, total)
+ self._enqueue_received_doc(doc, gen, trans_id, idx, total)
else:
self._return_doc_cb(doc, gen, trans_id)
# -------------------------------------------------------------
@@ -996,6 +994,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.stop(fail=True)
break
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
t.doc_syncer.set_request_method(
'get', idx, sync_id, last_known_generation,
last_known_trans_id)
@@ -1021,6 +1022,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
+ if defer_decryption and number_of_changes:
+ self._sync_decr_pool.set_docs_to_process(
+ number_of_changes)
else:
raise t.exception
first_request = False
@@ -1053,6 +1057,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
new_generation = doc_data['gen']
new_transaction_id = doc_data['trans_id']
+ # decrypt docs in case of deferred decryption
+ if defer_decryption:
+ self._sync_decr_pool.wait()
+ self._teardown_sync_decr_pool()
+
return new_generation, new_transaction_id
def sync_exchange(self, docs_by_generations,
@@ -1103,14 +1112,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
self._ensure_callback = ensure_callback
- if defer_decryption and self._sync_db is not None:
- self._sync_exchange_lock.acquire()
- self._setup_sync_decr_pool()
- self._defer_decryption = True
- else:
- # fall back
- defer_decryption = False
-
self.start()
if sync_id is None:
@@ -1120,10 +1121,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
setProxiedObject(self._insert_doc_cb[source_replica_uid],
return_doc_cb)
- # empty the database before starting a new sync
- if defer_decryption is True and not self.clear_to_sync():
- self._sync_decr_pool.empty()
-
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -1257,6 +1254,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
trans_id_after_send = response_dict['new_transaction_id']
# get docs from target
+ if self._sync_db is None:
+ defer_decryption = False
if self.stopped is False:
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
@@ -1266,12 +1265,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._syncer_pool.cleanup()
- # decrypt docs in case of deferred decryption
- if defer_decryption:
- self._sync_decr_pool.wait()
- self._teardown_sync_decr_pool()
- self._sync_exchange_lock.release()
-
# update gen and trans id info in case we just sent and did not
# receive docs.
if gen_after_send is not None and gen_after_send > cur_target_gen:
@@ -1357,7 +1350,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
encr.TABLE_NAME,))
self._sync_db.execute(sql, (doc_id, doc_rev))
- def _save_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
+ def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
Save a symmetrically encrypted incoming document into the received
docs table in the sync db. A decryption task will pick it up
@@ -1378,9 +1371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"Enqueueing doc for decryption: %d/%d."
% (idx + 1, total))
self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
+ doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
- def _save_received_doc(self, doc, gen, trans_id, idx, total):
+ def _enqueue_received_doc(self, doc, gen, trans_id, idx, total):
"""
Save any incoming document into the received docs table in the sync db.
@@ -1399,23 +1392,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"Enqueueing doc, no decryption needed: %d/%d."
% (idx + 1, total))
self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id)
+ doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
#
# Symmetric decryption of syncing docs
#
- def clear_to_sync(self):
- """
- Return whether sync can proceed (ie, the received db table is empty).
-
- :return: Whether sync can proceed.
- :rtype: bool
- """
- if self._sync_decr_pool:
- return self._sync_decr_pool.clear_to_sync()
- return True
-
def set_decryption_callback(self, cb):
"""
Set callback to be called when the decryption finishes.