diff options
Diffstat (limited to 'client/src/leap/soledad/client/encdecpool.py')
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 175 |
1 files changed, 151 insertions, 24 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 0466ec5d..0c1f92ea 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -85,6 +85,36 @@ class SyncEncryptDecryptPool(object): logger.debug("Terminating %s" % (self.__class__.__name__,)) self._pool.terminate() + def _runOperation(self, query, *args): + """ + Run an operation on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runOperation(query, *args) + + def _runQuery(self, query, *args): + """ + Run a query on the sync db. + + :param query: The query to be executed. + :type query: str + :param args: A list of query arguments. + :type args: list + + :return: A deferred that will fire with the results of the database + query. + :rtype: twisted.internet.defer.Deferred + """ + return self._sync_db.runQuery(query, *args) + def encrypt_doc_task(doc_id, doc_rev, content, key, secret): """ @@ -119,7 +149,50 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): TABLE_NAME = "docs_tosync" FIELD_NAMES = "doc_id PRIMARY KEY, rev, content" - def encrypt_doc(self, doc, workers=True): + ENCRYPT_LOOP_PERIOD = 0.5 + + def __init__(self, *args, **kwargs): + """ + Initialize the sync encrypter pool. + """ + SyncEncryptDecryptPool.__init__(self, *args, **kwargs) + + self._stopped = False + self._sync_queue = multiprocessing.Queue() + + # start the encryption loop + self._deferred_loop = deferToThread(self._encrypt_docs_loop) + self._deferred_loop.addCallback( + lambda _: logger.debug("Finished encrypter thread.")) + + def enqueue_doc_for_encryption(self, doc): + """ + Enqueue a document for encryption. + + :param doc: The document to be encrypted. + :type doc: SoledadDocument + """ + try: + self.sync_queue.put_nowait(doc) + except multiprocessing.Queue.Full: + # do not asynchronously encrypt this file if the queue is full + pass + + def _encrypt_docs_loop(self): + """ + Process the syncing queue and send the documents there to be encrypted + in the sync db. They will be read by the SoledadSyncTarget during the + sync_exchange. + """ + logger.debug("Starting encrypter thread.") + while not self._stopped: + try: + doc = self._sync_queue.get(True, self.ENCRYPT_LOOP_PERIOD) + self._encrypt_doc(doc) + except multiprocessing.Queue.Empty: + pass + + def _encrypt_doc(self, doc, workers=True): """ Symmetrically encrypt a document. @@ -136,20 +209,20 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): secret = self._crypto.secret args = doc.doc_id, doc.rev, docstr, key, secret - try: - if workers: - res = self._pool.apply_async( - encrypt_doc_task, args, - callback=self.encrypt_doc_cb) - else: - # encrypt inline + if workers: + # encrypt asynchronously + self._pool.apply_async( + encrypt_doc_task, args, + callback=self._encrypt_doc_cb) + else: + # encrypt inline + try: res = encrypt_doc_task(*args) - self.encrypt_doc_cb(res) + self._encrypt_doc_cb(res) + except Exception as exc: + logger.exception(exc) - except Exception as exc: - logger.exception(exc) - - def encrypt_doc_cb(self, result): + def _encrypt_doc_cb(self, result): """ Insert results of encryption routine into the local sync database. @@ -158,9 +231,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type result: tuple(str, str, str) """ doc_id, doc_rev, content = result - return self.insert_encrypted_local_doc(doc_id, doc_rev, content) + return self._insert_encrypted_local_doc(doc_id, doc_rev, content) - def insert_encrypted_local_doc(self, doc_id, doc_rev, content): + def _insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync database. @@ -174,7 +247,58 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?)" \ % (self.TABLE_NAME,) - return self._sync_db.runOperation(query, (doc_id, doc_rev, content)) + return self._runOperation(query, (doc_id, doc_rev, content)) + + @defer.inlineCallbacks + def get_encrypted_doc(self, doc_id, doc_rev): + """ + Get an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire with the encrypted content of the + document or None if the document was not found in the sync + db. + :rtype: twisted.internet.defer.Deferred + """ + logger.debug("Trying to get encrypted doc from sync db: %s" % doc_id) + query = "SELECT content FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + result = yield self._runQuery(query, (doc_id, doc_rev)) + if result: + val = result.pop() + defer.returnValue(val[0]) + defer.returnValue(None) + + def delete_encrypted_doc(self, doc_id, doc_rev): + """ + Delete an encrypted document from the sync db. + + :param doc_id: The id of the document. + :type doc_id: str + :param doc_rev: The revision of the document. + :type doc_rev: str + + :return: A deferred that will fire when the operation in the database + has finished. + :rtype: twisted.internet.defer.Deferred + """ + query = "DELETE FROM %s WHERE doc_id=? and rev=?" \ + % self.TABLE_NAME + self._runOperation(query, (doc_id, doc_rev)) + + def close(self): + """ + Close the encrypter pool. + """ + self._stopped = True + self._sync_queue.close() + q = self._sync_queue + del q + self._sync_queue = None def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret, @@ -275,9 +399,10 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): d.addCallback(lambda _: self._empty_db.set()) # start the decryption loop - self._deferred_loop = deferToThread(self._decrypt_and_process_docs) + self._deferred_loop = deferToThread( + self._decrypt_and_process_docs_loop) self._deferred_loop.addCallback( - lambda _: logger.debug("Finished decryptor thread.")) + lambda _: logger.debug("Finished decrypter thread.")) def set_docs_to_process(self, docs_to_process): """ @@ -317,7 +442,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): docstr = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, docstr, gen, trans_id, 1, idx)) def insert_received_doc( @@ -348,7 +473,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): content = json.dumps(content) query = "INSERT OR REPLACE INTO '%s' VALUES (?, ?, ?, ?, ?, ?, ?)" \ % self.TABLE_NAME - return self._sync_db.runOperation( + return self._runOperation( query, (doc_id, doc_rev, content, gen, trans_id, 0, idx)) def _delete_received_doc(self, doc_id): @@ -364,7 +489,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ query = "DELETE FROM '%s' WHERE doc_id=?" \ % self.TABLE_NAME - return self._sync_db.runOperation(query, (doc_id,)) + return self._runOperation(query, (doc_id,)) def _decrypt_doc(self, doc_id, rev, content, gen, trans_id, idx, workers=True): @@ -474,7 +599,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): if encrypted is not None: query += " WHERE encrypted = %d" % int(encrypted) query += " ORDER BY %s %s" % (order_by, order) - return self._sync_db.runQuery(query) + return self._runQuery(query) @defer.inlineCallbacks def _get_insertable_docs(self): @@ -612,7 +737,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): :rtype: twisted.internet.defer.Deferred """ query = "DELETE FROM %s WHERE 1" % (self.TABLE_NAME,) - return self._sync_db.runOperation(query) + return self._runOperation(query) def _raise_if_async_fails(self): """ @@ -627,7 +752,7 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): # re-raise the exception raised by the remote call res.get() - def _decrypt_and_process_docs(self): + def _decrypt_and_process_docs_loop(self): """ Decrypt the documents received from remote replica and insert them into the local one. @@ -668,6 +793,8 @@ class SyncDecrypterPool(SyncEncryptDecryptPool): """ Wait for the decrypt-and-process loop to finish. """ + logger.debug("Waiting for asynchronous decryption of incoming documents...") self._finished.wait() + logger.debug("Asynchronous decryption of incoming documents finished.") if self._exception: raise self._exception |