diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 153 |
1 files changed, 134 insertions, 19 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index dc2a0420..9e65b2df 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -22,30 +22,33 @@ import cStringIO import gzip import logging import os +import re import sqlite3 import urllib -import simplejson as json +from collections import defaultdict from time import sleep +import simplejson as json +from taskthread import TimerTask from u1db.remote import utils, http_errors from u1db.errors import BrokenSyncStream from u1db import errors from u1db.remote.http_target import HTTPSyncTarget from u1db.remote.http_client import _encode_query_parameter +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted, decrypt_doc +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_docstr, decrypt_doc_dict +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.common.check import leap_check logger = logging.getLogger(__name__) -# -# Exceptions -# - def _gunzip(data): """ @@ -65,10 +68,14 @@ def _gunzip(data): return data +class PendingReceivedDocsSyncError(Exception): + pass + # # SoledadSyncTarget # + class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ A SyncTarget that encrypts data before sending and decrypts data after @@ -80,6 +87,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): written to the main database. """ + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + # # Modified HTTPSyncTarget methods. # @@ -109,10 +120,32 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + print "URL : ", url + self.source_replica_uid = re.findall("user-([0-9a-fA-F]+)", url)[0] + print "uid -->", self.source_replica_uid + self._sync_db = None if sync_db_path is not None: self._init_sync_db(sync_db_path) + # whether to bypass the received messages decryption deferral + self._decrypt_inline = False + + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, self._sync_db, + insert_doc_cb=self._insert_doc_cb) + self._sync_watcher = TimerTask( + self._decrypt_syncing_received_docs, delay=10) + self._sync_watcher.start() + + def set_decrypt_inline(self, value): + self._decrypt_inline = value + + @property + def decrypt_inline(self): + return self._decrypt_inline + @staticmethod def connect(url, crypto=None): return SoledadSyncTarget(url, crypto=crypto) @@ -151,31 +184,57 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise BrokenSyncStream data = parts[1:-1] comma = False + + queue_for_decrypt = (not self.decrypt_inline or + self._sync_db is None) + if queue_for_decrypt: + self._sync_decr_pool.write_encrypted_lock.acquire() if data: line, comma = utils.check_and_strip_comma(data[0]) res = json.loads(line) if ensure_callback and 'replica_uid' in res: ensure_callback(res['replica_uid']) + + # XXX check that writing_incoming lock is not acquired --------- + for entry in data[1:]: if not comma: # missing in between comma raise BrokenSyncStream line, comma = utils.check_and_strip_comma(entry) entry = json.loads(line) + gen, trans_id = entry['gen'], entry['trans_id'] #------------------------------------------------------------- # symmetric decryption of document's contents #------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. + # If arriving content was symmetrically encrypted, we decrypt + # it. We do it inline if decrypt_inline flag is True or no + # sync_db was defined, otherwise we defer it writing it to the + # received docs table. + doc = SoledadDocument( entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) + + if is_symmetrically_encrypted(doc): + if queue_for_decrypt: + print "ENQUEUING DECRYPT -----------------------" + self._save_encrypted_received_doc(doc, gen, trans_id) + else: + print "INLINE DECRYPT -------------------------" + # force inline decrypt, or no-db fallback, for tests + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + doc.set_json(decrypt_doc_dict( + doc.content, doc.doc_id, doc.rev, + key, secret)) + # XXX should release lock in the decrypt pool + #------------------------------------------------------------- # end of symmetric decryption #------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) + if not queue_for_decrypt: + return_doc_cb(doc, gen, trans_id) + if queue_for_decrypt: + self._sync_decr_pool.write_encrypted_lock.release() if parts[-1] != ']': try: partdic = json.loads(parts[-1]) @@ -304,6 +363,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self.source_replica_uid = source_replica_uid + print "SETTING SOURCE REPLICA UID to", source_replica_uid + # let the decrypter pool access the passed callback to insert docs + print "SETTING PROXY TO ------------>", return_doc_cb + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + if not self.clear_to_sync(): + raise PendingReceivedDocsSyncError + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -340,9 +409,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if not doc.is_tombstone(): if self._sync_db is None: # fallback case, for tests + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + doc_json = encrypt_docstr( json.dumps(doc.get_json()), - doc.doc_id, doc.rev, self._crypto.secret) + doc.doc_id, doc.rev, key, secret) else: try: doc_json = self.get_encrypted_doc_from_db( @@ -378,6 +450,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.delete_encrypted_docs_from_db(synced) data = None + print "SYNC EXCHANGE FINISHED: new generation -> %s" % res['new_generation'] return res['new_generation'], res['new_transaction_id'] # @@ -439,9 +512,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param doc_rev: The document revision :type doc_rev: str """ + encr = SyncEncrypterPool c = self._sync_db.cursor() - # XXX interpolate table name - sql = ("SELECT content FROM docs_tosync WHERE doc_id=? and rev=?") + sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) c.execute(sql, (doc_id, doc_rev)) res = c.fetchall() if len(res) != 0: @@ -456,10 +530,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): to be deleted. :type docs_ids: any iterable of tuples of str """ + encr = SyncEncrypterPool c = self._sync_db.cursor() for doc_id, doc_rev in docs_ids: - # XXX interpolate table name - sql = ("DELETE FROM docs_tosync " - "WHERE doc_id=? and rev=?") + sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) c.execute(sql, (doc_id, doc_rev)) self._sync_db.commit() + + def _save_encrypted_received_doc(self, doc, gen, trans_id): + """ + Save an incoming document into the received docs table in the sync db. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + """ + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + # + # Symmetric decryption of syncing docs + # + + def clear_to_sync(self): + """ + Return True if sync can proceed (ie, the received db table is empty). + :rtype: bool + """ + return self._sync_decr_pool.count_received_encrypted_docs() == 0 + + def _decrypt_syncing_received_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from TimerTask self._sync_watcher. + """ + if sameProxiedObjects(self._insert_doc_cb.get(self.source_replica_uid), + None): + logger.warning("No insert_doc callback, skipping decryption.") + return + + decrypter = self._sync_decr_pool + decrypter.decrypt_received_docs(self.source_replica_uid) + decrypter.process_decrypted() |