diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 217 |
1 files changed, 120 insertions, 97 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index f2415218..667aab15 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import threading from collections import defaultdict from time import sleep from uuid import uuid4 +from functools import partial import simplejson as json @@ -38,12 +39,12 @@ from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase from zope.proxy import ProxyBase from zope.proxy import setProxiedObject +from twisted.internet import defer from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.crypto import encrypt_doc, decrypt_doc -from leap.soledad.client.encdecpool import SyncEncrypterPool from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -111,6 +112,7 @@ class DocumentSyncerThread(threading.Thread): self._exception = None self._result = None self._success = False + self.started = threading.Event() # a lock so we can signal when we're finished self._request_lock = threading.Lock() self._request_lock.acquire() @@ -128,6 +130,8 @@ class DocumentSyncerThread(threading.Thread): finish before actually performing the request. It also traps any exception and register any failure with the request. """ + self.started.set() + with self._stop_lock: if self._stopped is None: self._stopped = False @@ -756,7 +760,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # def __init__(self, url, source_replica_uid=None, creds=None, crypto=None, - sync_db=None): + sync_db=None, sync_enc_pool=None): """ Initialize the SoledadSyncTarget. @@ -787,8 +791,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.source_replica_uid = source_replica_uid self._syncer_pool = None - # deferred decryption attributes + # asynchronous encryption/decryption attributes self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool self._decryption_callback = None self._sync_decr_pool = None @@ -796,7 +801,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ Set up the SyncDecrypterPool for deferred decryption. """ - if self._sync_decr_pool is None: + if self._sync_decr_pool is None and self._sync_db is not None: # initialize syncing queue decryption pool self._sync_decr_pool = SyncDecrypterPool( self._crypto, @@ -1018,7 +1023,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): t.join() if t.success: number_of_changes, _, _ = t.result - if defer_decryption and number_of_changes: + if defer_decryption: self._sync_decr_pool.set_docs_to_process( number_of_changes) else: @@ -1060,6 +1065,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return new_generation, new_transaction_id + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + @property + def _defer_decryption(self): + return self._sync_decr_pool is not None + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, return_doc_cb, @@ -1126,17 +1139,19 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # send docs + # ------------------------------------------------------------------- + # start of send documents to target + # ------------------------------------------------------------------- msg = "%d/%d" % (0, len(docs_by_generations)) signal(SOLEDAD_SYNC_SEND_STATUS, msg) logger.debug("Soledad sync send status: %s" % msg) - defer_encryption = self._sync_db is not None self._syncer_pool = DocumentSyncerPool( self._raw_url, self._raw_creds, url, headers, ensure_callback, self.stop_syncer) threads = [] last_callback_lock = None + sent = 0 total = len(docs_by_generations) @@ -1156,66 +1171,78 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- # symmetric encryption of document's contents # ------------------------------------------------------------- - doc_json = doc.get_json() - if not doc.is_tombstone(): - if not defer_encryption: - # fallback case, for tests - doc_json = encrypt_doc(self._crypto, doc) - else: - try: - doc_json = self.get_encrypted_doc_from_db( - doc.doc_id, doc.rev) - except Exception as exc: - logger.error("Error while getting " - "encrypted doc from db") - logger.exception(exc) - continue + + # the following var will hold a deferred because we may try to + # fetch the encrypted document from the sync db + d = None + + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(encrypt_doc(self._crypto, doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): if doc_json is None: - # Not marked as tombstone, but we got nothing - # from the sync db. As it is not encrypted yet, we - # force inline encryption. + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. # TODO: implement a queue to deal with these cases. - doc_json = encrypt_doc(self._crypto, doc) + return encrypt_doc(self._crypto, doc) + return doc_json + + d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) # ------------------------------------------------------------- # end of symmetric encryption # ------------------------------------------------------------- + t = self._syncer_pool.new_syncer_thread( sent + 1, total, last_request_lock=last_request_lock, last_callback_lock=last_callback_lock) - # bail out if any thread failed + # bail out if creation of any thread failed if t is None: self.stop(fail=True) break - # set the request method - t.doc_syncer.set_request_method( - 'put', sync_id, cur_target_gen, cur_target_trans_id, - id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=sent + 1) - # set the success calback + # the following callback will be called when the document's + # encrypted content is available, either because it was found on + # the sync db or because it has been encrypted inline. - def _success_callback(idx, total, response): - _success_msg = "Soledad sync send status: %d/%d" \ - % (idx, total) - signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) - logger.debug(_success_msg) + def _configure_and_start_thread(t, doc_json): + # set the request method + t.doc_syncer.set_request_method( + 'put', sync_id, cur_target_gen, cur_target_trans_id, + id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=sent + 1) + # set the success calback - t.doc_syncer.set_success_callback(_success_callback) + def _success_callback(idx, total, response): + _success_msg = "Soledad sync send status: %d/%d" \ + % (idx, total) + signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg) + logger.debug(_success_msg) - # set the failure callback - def _failure_callback(idx, total, exception): - _failure_msg = "Soledad sync: error while sending document " \ - "%d/%d: %s" % (idx, total, exception) - logger.warning("%s" % _failure_msg) - logger.warning("Soledad sync: failing gracefully, will " - "recover on next sync.") + t.doc_syncer.set_success_callback(_success_callback) - t.doc_syncer.set_failure_callback(_failure_callback) + # set the failure callback + def _failure_callback(idx, total, exception): + _failure_msg = "Soledad sync: error while sending document " \ + "%d/%d: %s" % (idx, total, exception) + logger.warning("%s" % _failure_msg) + logger.warning("Soledad sync: failing gracefully, will " + "recover on next sync.") + + t.doc_syncer.set_failure_callback(_failure_callback) + + # save thread and append + t.start() + + d.addCallback(partial(_configure_and_start_thread, t)) - # save thread and append - t.start() threads.append((t, doc)) # update lock references so they can be used in next call to @@ -1230,6 +1257,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): while threads: # check if there are failures t, doc = threads.pop(0) + t.started.wait() t.join() if t.success: synced.append((doc.doc_id, doc.rev)) @@ -1238,8 +1266,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise t.exception # delete documents from the sync database - if defer_encryption: - self.delete_encrypted_docs_from_db(synced) + if self._defer_encryption: + self._delete_encrypted_docs_from_db(synced) # get target gen and trans_id after docs gen_after_send = None @@ -1248,16 +1276,23 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): response_dict = json.loads(last_successful_thread.response[0])[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] - - # get docs from target - if self._sync_db is None: - defer_decryption = False + # ------------------------------------------------------------------- + # end of send documents to target + # ------------------------------------------------------------------- + + # ------------------------------------------------------------------- + # start of fetch documents from target + # ------------------------------------------------------------------- + defer_decryption = defer_decryption and self._defer_decryption if self.stopped is False: cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, return_doc_cb, ensure_callback, sync_id, defer_decryption=defer_decryption) + # ------------------------------------------------------------------- + # end of fetch documents from target + # ------------------------------------------------------------------- self._syncer_pool.cleanup() @@ -1308,6 +1343,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): with self._stop_lock: return self._stopped is True + # + # Symmetric encryption of syncing docs + # + def get_encrypted_doc_from_db(self, doc_id, doc_rev): """ Retrieve encrypted document from the database of encrypted docs for @@ -1318,33 +1357,31 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param doc_rev: The document revision :type doc_rev: str + + :return: A deferred which is fired with the document's encrypted + content or None if the document was not found on the sync db. + :rtype: twisted.internet.defer.Deferred """ - encr = SyncEncrypterPool - sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - res = self._fetchall(sql, (doc_id, doc_rev)) - if res: - val = res.pop() - return val[0] - else: - # no doc found - return None + logger.debug("Looking for encrypted document on sync db: %s" % doc_id) + return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev) - def delete_encrypted_docs_from_db(self, docs_ids): + def _delete_encrypted_docs_from_db(self, docs): """ Delete several encrypted documents from the database of symmetrically encrypted docs to sync. - :param docs_ids: an iterable with (doc_id, doc_rev) for all documents - to be deleted. - :type docs_ids: any iterable of tuples of str + :param docs: an iterable with (doc_id, doc_rev) for all documents + to be deleted. + :type docs: any iterable of tuples of str """ - if docs_ids: - encr = SyncEncrypterPool - for doc_id, doc_rev in docs_ids: - sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( - encr.TABLE_NAME,)) - self._sync_db.execute(sql, (doc_id, doc_rev)) + for doc_id, doc_rev in docs: + logger.debug("Removing encrypted document on sync db: %s" + % doc_id) + return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev) + + # + # Symmetric decryption of syncing docs + # def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total): """ @@ -1357,15 +1394,14 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param gen: The generation. :type gen: str :param trans_id: Transacion id. - :type gen: str + :param idx: The index count of the current operation. :type idx: int :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc for decryption: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc for decryption: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_encrypted_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) @@ -1384,16 +1420,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param total: The total number of operations. :type total: int """ - logger.debug( - "Enqueueing doc, no decryption needed: %d/%d." - % (idx + 1, total)) + logger.debug("Enqueueing doc, no decryption needed: %d/%d." + % (idx + 1, total)) self._sync_decr_pool.insert_received_doc( doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1) - # - # Symmetric decryption of syncing docs - # - def set_decryption_callback(self, cb): """ Set callback to be called when the decryption finishes. @@ -1410,11 +1441,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ return self._decryption_callback is not None - def has_syncdb(self): - """ - Return True if we have an initialized syncdb. - """ - return self._sync_db is not None + # + # Authentication methods + # def _sign_request(self, method, url_query, params): """ @@ -1442,9 +1471,3 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :type token: str """ TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _fetchall(self, *args, **kwargs): - with self._sync_db: - c = self._sync_db.cursor() - c.execute(*args, **kwargs) - return c.fetchall() |