summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/encdecpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r--client/src/leap/soledad/client/encdecpool.py175
1 files changed, 151 insertions, 24 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 0466ec5d..0c1f92ea 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object):
logger.debug("Terminating %s" % (self.__class__.__name__,))
self._pool.terminate()
+ def _runOperation(self, query, *args):
+ """
+ Run an operation on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runOperation(query, *args)
+
+ def _runQuery(self, query, *args):
+ """
+ Run a query on the sync db.
+
+ :param query: The query to be executed.
+ :type query: str
+ :param args: A list of query arguments.
+ :type args: list
+
+ :return: A deferred that will fire with the results of the database
+ query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ return self._sync_db.runQuery(query, *args)
+
def encrypt_doc_task(doc_id, doc_rev, content, key, secret):
"""
@@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
TABLE_NAME = "docs_tosync"
FIELD_NAMES = "doc_id PRIMARY KEY, rev, content"
- def encrypt_doc(self, doc, workers=True):
+ ENCRYPT_LOOP_PERIOD = 0.5
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the sync encrypter pool.
+ """
+ SyncEncryptDecryptPool.__init__(self, *args, **kwargs)
+
+ self._stopped = False
+ self._sync_queue = multiprocessing.Queue()
+
+ # start the encryption loop
+ self._deferred_loop = deferToThread(self._encrypt_docs_loop)
+ self._deferred_loop.addCallback(
+ lambda _: logger.debug("Finished encrypter thread."))
+
+ def enqueue_doc_for_encryption(self, doc):
+ """
+ Enqueue a document for encryption.
+
+ :param doc: The document to be encrypted.
+ :type doc: SoledadDocument
+ """
+ try:
+ self.sync_queue.put_nowait(doc)
+ except multiprocessing.Queue.Full:
+ # do not asynchronously encrypt this file if the queue is full
+ pass
+
+ def _encrypt_docs_loop(self):
+ """
+ Process the syncing queue and send the documents there to be encrypted
+ in the sync db. They will be read by the SoledadSyncTarget during the
+ sync_exchange.
+ """
+ logger.debug("Starting encrypter thread.")
+ while not self._stopped:
+ try:
+ doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD)
+ self._encrypt_doc(doc)
+ except multiprocessing.Queue.Empty:
+ pass
+
+ def _encrypt_doc(self, doc, workers=True):
"""
Symmetrically encrypt a document.
@@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
secret = self._crypto.secret
args = doc.doc_id, doc.rev, docstr, key, secret
- try:
- if workers:
- res = self._pool.apply_async(
- encrypt_doc_task, args,
- callback=self.encrypt_doc_cb)
- else:
- # encrypt inline
+ if workers:
+ # encrypt asynchronously
+ self._pool.apply_async(
+ encrypt_doc_task, args,
+ callback=self._encrypt_doc_cb)
+ else:
+ # encrypt inline
+ try:
res = encrypt_doc_task(*args)
- self.encrypt_doc_cb(res)
+ self._encrypt_doc_cb(res)
+ except Exception as exc:
+ logger.exception(exc)
- except Exception as exc:
- logger.exception(exc)
-
- def encrypt_doc_cb(self, result):
+ def _encrypt_doc_cb(self, result):
"""
Insert results of encryption routine into the local sync database.
@@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
:type result: tuple(str, str, str)
"""
doc_id, doc_rev, content = result
- return self.insert_encrypted_local_doc(doc_id, doc_rev, content)
+ return self._insert_encrypted_local_doc(doc_id, doc_rev, content)
- def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
+ def _insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
@@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
"""
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \
% (self.TABLE_NAME,)
- return self._sync_db.runOperation(query, (doc_id, doc_rev, content))
+ return self._runOperation(query, (doc_id, doc_rev, content))
+
+ @defer.inlineCallbacks
+ def get_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Get an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire with the encrypted content of the
+ document or None if the document was not found in the sync
+ db.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id)
+ query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ result = yield self._runQuery(query, (doc_id, doc_rev))
+ if result:
+ val = result.pop()
+ defer.returnValue(val[0])
+ defer.returnValue(None)
+
+ def delete_encrypted_doc(self, doc_id, doc_rev):
+ """
+ Delete an encrypted document from the sync db.
+
+ :param doc_id: The id of the document.
+ :type doc_id: str
+ :param doc_rev: The revision of the document.
+ :type doc_rev: str
+
+ :return: A deferred that will fire when the operation in the database
+ has finished.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ query = "DELETE FROM %s WHERE doc_id=? and rev=?" \
+ % self.TABLE_NAME
+ self._runOperation(query, (doc_id, doc_rev))
+
+ def close(self):
+ """
+ Close the encrypter pool.
+ """
+ self._stopped = True
+ self._sync_queue.close()
+ q = self._sync_queue
+ del q
+ self._sync_queue = None
def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret,
@@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
d.addCallback(lambda _: self._empty_db.set())
# start the decryption loop
- self._deferred_loop = deferToThread(self._decrypt_and_process_docs)
+ self._deferred_loop = deferToThread(
+ self._decrypt_and_process_docs_loop)
self._deferred_loop.addCallback(
- lambda _: logger.debug("Finished decryptor thread."))
+ lambda _: logger.debug("Finished decrypter thread."))
def set_docs_to_process(self, docs_to_process):
"""
@@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
docstr = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx))
def insert_received_doc(
@@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
content = json.dumps(content)
query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \
% self.TABLE_NAME
- return self._sync_db.runOperation(
+ return self._runOperation(
query, (doc_id, doc_rev, content, gen, trans_id, 0, idx))
def _delete_received_doc(self, doc_id):
@@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
query = "DELETE FROM '%s' WHERE doc_id=?" \
% self.TABLE_NAME
- return self._sync_db.runOperation(query, (doc_id,))
+ return self._runOperation(query, (doc_id,))
def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx,
workers=True):
@@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
if encrypted is not None:
query += " WHERE encrypted = %d" % int(encrypted)
query += " ORDER BY %s %s" % (order_by, order)
- return self._sync_db.runQuery(query)
+ return self._runQuery(query)
@defer.inlineCallbacks
def _get_insertable_docs(self):
@@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
:rtype: twisted.internet.defer.Deferred
"""
query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,)
- return self._sync_db.runOperation(query)
+ return self._runOperation(query)
def _raise_if_async_fails(self):
"""
@@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
# re-raise the exception raised by the remote call
res.get()
- def _decrypt_and_process_docs(self):
+ def _decrypt_and_process_docs_loop(self):
"""
Decrypt the documents received from remote replica and insert them
into the local one.
@@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Wait for the decrypt-and-process loop to finish.
"""
+ logger.debug("Waiting for asynchronous decryption of incoming documents...")
self._finished.wait()
+ logger.debug("Asynchronous decryption of incoming documents finished.")
if self._exception:
raise self._exception