summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py217
1 files changed, 120 insertions, 97 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index f2415218..667aab15 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -28,6 +28,7 @@ import threading
from collections import defaultdict
from time import sleep
from uuid import uuid4
+from functools import partial
import simplejson as json
@@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
from zope.proxy import ProxyBase
from zope.proxy import setProxiedObject
+from twisted.internet import defer
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
from leap.soledad.client.crypto import is_symmetrically_encrypted
from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.encdecpool import SyncEncrypterPool
from leap.soledad.client.encdecpool import SyncDecrypterPool
from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
@@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread):
self._exception = None
self._result = None
self._success = False
+ self.started = threading.Event()
# a lock so we can signal when we're finished
self._request_lock = threading.Lock()
self._request_lock.acquire()
@@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread):
finish before actually performing the request. It also traps any
exception and register any failure with the request.
"""
+ self.started.set()
+
with self._stop_lock:
if self._stopped is None:
self._stopped = False
@@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
#
def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None):
+ sync_db=None, sync_enc_pool=None):
"""
Initialize the SoledadSyncTarget.
@@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.source_replica_uid = source_replica_uid
self._syncer_pool = None
- # deferred decryption attributes
+ # asynchronous encryption/decryption attributes
self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
self._decryption_callback = None
self._sync_decr_pool = None
@@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
Set up the SyncDecrypterPool for deferred decryption.
"""
- if self._sync_decr_pool is None:
+ if self._sync_decr_pool is None and self._sync_db is not None:
# initialize syncing queue decryption pool
self._sync_decr_pool = SyncDecrypterPool(
self._crypto,
@@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
t.join()
if t.success:
number_of_changes, _, _ = t.result
- if defer_decryption and number_of_changes:
+ if defer_decryption:
self._sync_decr_pool.set_docs_to_process(
number_of_changes)
else:
@@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
return new_generation, new_transaction_id
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ @property
+ def _defer_decryption(self):
+ return self._sync_decr_pool is not None
+
def sync_exchange(self, docs_by_generations,
source_replica_uid, last_known_generation,
last_known_trans_id, return_doc_cb,
@@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
- # send docs
+ # -------------------------------------------------------------------
+ # start of send documents to target
+ # -------------------------------------------------------------------
msg = "%d/%d" % (0, len(docs_by_generations))
signal(SOLEDAD_SYNC_SEND_STATUS, msg)
logger.debug("Soledad sync send status: %s" % msg)
- defer_encryption = self._sync_db is not None
self._syncer_pool = DocumentSyncerPool(
self._raw_url, self._raw_creds, url, headers, ensure_callback,
self.stop_syncer)
threads = []
last_callback_lock = None
+
sent = 0
total = len(docs_by_generations)
@@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
# -------------------------------------------------------------
# symmetric encryption of document's contents
# -------------------------------------------------------------
- doc_json = doc.get_json()
- if not doc.is_tombstone():
- if not defer_encryption:
- # fallback case, for tests
- doc_json = encrypt_doc(self._crypto, doc)
- else:
- try:
- doc_json = self.get_encrypted_doc_from_db(
- doc.doc_id, doc.rev)
- except Exception as exc:
- logger.error("Error while getting "
- "encrypted doc from db")
- logger.exception(exc)
- continue
+
+ # the following var will hold a deferred because we may try to
+ # fetch the encrypted document from the sync db
+ d = None
+
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(encrypt_doc(self._crypto, doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
if doc_json is None:
- # Not marked as tombstone, but we got nothing
- # from the sync db. As it is not encrypted yet, we
- # force inline encryption.
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
# TODO: implement a queue to deal with these cases.
- doc_json = encrypt_doc(self._crypto, doc)
+ return encrypt_doc(self._crypto, doc)
+ return doc_json
+
+ d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
# -------------------------------------------------------------
# end of symmetric encryption
# -------------------------------------------------------------
+
t = self._syncer_pool.new_syncer_thread(
sent + 1, total, last_request_lock=last_request_lock,
last_callback_lock=last_callback_lock)
- # bail out if any thread failed
+ # bail out if creation of any thread failed
if t is None:
self.stop(fail=True)
break
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
+ # the following callback will be called when the document's
+ # encrypted content is available, either because it was found on
+ # the sync db or because it has been encrypted inline.
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
+ def _configure_and_start_thread(t, doc_json):
+ # set the request method
+ t.doc_syncer.set_request_method(
+ 'put', sync_id, cur_target_gen, cur_target_trans_id,
+ id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=sent + 1)
+ # set the success calback
- t.doc_syncer.set_success_callback(_success_callback)
+ def _success_callback(idx, total, response):
+ _success_msg = "Soledad sync send status: %d/%d" \
+ % (idx, total)
+ signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
+ logger.debug(_success_msg)
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
+ t.doc_syncer.set_success_callback(_success_callback)
- t.doc_syncer.set_failure_callback(_failure_callback)
+ # set the failure callback
+ def _failure_callback(idx, total, exception):
+ _failure_msg = "Soledad sync: error while sending document " \
+ "%d/%d: %s" % (idx, total, exception)
+ logger.warning("%s" % _failure_msg)
+ logger.warning("Soledad sync: failing gracefully, will "
+ "recover on next sync.")
+
+ t.doc_syncer.set_failure_callback(_failure_callback)
+
+ # save thread and append
+ t.start()
+
+ d.addCallback(partial(_configure_and_start_thread, t))
- # save thread and append
- t.start()
threads.append((t, doc))
# update lock references so they can be used in next call to
@@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
while threads:
# check if there are failures
t, doc = threads.pop(0)
+ t.started.wait()
t.join()
if t.success:
synced.append((doc.doc_id, doc.rev))
@@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise t.exception
# delete documents from the sync database
- if defer_encryption:
- self.delete_encrypted_docs_from_db(synced)
+ if self._defer_encryption:
+ self._delete_encrypted_docs_from_db(synced)
# get target gen and trans_id after docs
gen_after_send = None
@@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
response_dict = json.loads(last_successful_thread.response[0])[0]
gen_after_send = response_dict['new_generation']
trans_id_after_send = response_dict['new_transaction_id']
-
- # get docs from target
- if self._sync_db is None:
- defer_decryption = False
+ # -------------------------------------------------------------------
+ # end of send documents to target
+ # -------------------------------------------------------------------
+
+ # -------------------------------------------------------------------
+ # start of fetch documents from target
+ # -------------------------------------------------------------------
+ defer_decryption = defer_decryption and self._defer_decryption
if self.stopped is False:
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback, sync_id,
defer_decryption=defer_decryption)
+ # -------------------------------------------------------------------
+ # end of fetch documents from target
+ # -------------------------------------------------------------------
self._syncer_pool.cleanup()
@@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
with self._stop_lock:
return self._stopped is True
+ #
+ # Symmetric encryption of syncing docs
+ #
+
def get_encrypted_doc_from_db(self, doc_id, doc_rev):
"""
Retrieve encrypted document from the database of encrypted docs for
@@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param doc_rev: The document revision
:type doc_rev: str
+
+ :return: A deferred which is fired with the document's encrypted
+ content or None if the document was not found on the sync db.
+ :rtype: twisted.internet.defer.Deferred
"""
- encr = SyncEncrypterPool
- sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- res = self._fetchall(sql, (doc_id, doc_rev))
- if res:
- val = res.pop()
- return val[0]
- else:
- # no doc found
- return None
+ logger.debug("Looking for encrypted document on sync db: %s" % doc_id)
+ return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev)
- def delete_encrypted_docs_from_db(self, docs_ids):
+ def _delete_encrypted_docs_from_db(self, docs):
"""
Delete several encrypted documents from the database of symmetrically
encrypted docs to sync.
- :param docs_ids: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs_ids: any iterable of tuples of str
+ :param docs: an iterable with (doc_id, doc_rev) for all documents
+ to be deleted.
+ :type docs: any iterable of tuples of str
"""
- if docs_ids:
- encr = SyncEncrypterPool
- for doc_id, doc_rev in docs_ids:
- sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
- encr.TABLE_NAME,))
- self._sync_db.execute(sql, (doc_id, doc_rev))
+ for doc_id, doc_rev in docs:
+ logger.debug("Removing encrypted document on sync db: %s"
+ % doc_id)
+ return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
"""
@@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param gen: The generation.
:type gen: str
:param trans_id: Transacion id.
- :type gen: str
+
:param idx: The index count of the current operation.
:type idx: int
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc for decryption: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_encrypted_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
@@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param total: The total number of operations.
:type total: int
"""
- logger.debug(
- "Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
+ logger.debug("Enqueueing doc, no decryption needed: %d/%d."
+ % (idx + 1, total))
self._sync_decr_pool.insert_received_doc(
doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
- #
- # Symmetric decryption of syncing docs
- #
-
def set_decryption_callback(self, cb):
"""
Set callback to be called when the decryption finishes.
@@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
return self._decryption_callback is not None
- def has_syncdb(self):
- """
- Return True if we have an initialized syncdb.
- """
- return self._sync_db is not None
+ #
+ # Authentication methods
+ #
def _sign_request(self, method, url_query, params):
"""
@@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:type token: str
"""
TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _fetchall(self, *args, **kwargs):
- with self._sync_db:
- c = self._sync_db.cursor()
- c.execute(*args, **kwargs)
- return c.fetchall()