diff options
author | Kali Kaneko <kali@leap.se> | 2014-03-14 02:09:40 -0400 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2014-03-17 12:37:35 -0400 |
commit | d2d3a243b6da313a54c8c498ffcd3f065721ad5a (patch) | |
tree | 2e34698acdbf656e759c945bca96c3e04ca30d6a | |
parent | 1a60f3616efef904917dd77a12170912defc7637 (diff) |
move symmetric decryption of docs to be db-based toofeature/enc-sync-transitional-db
-rw-r--r-- | client/pkg/requirements.pip | 5 | ||||
-rw-r--r-- | client/src/leap/soledad/client/__init__.py | 25 | ||||
-rw-r--r-- | client/src/leap/soledad/client/crypto.py | 277 | ||||
-rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 20 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 153 |
5 files changed, 418 insertions, 62 deletions
diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip index ff1b4f35..6f2954ab 100644 --- a/client/pkg/requirements.pip +++ b/client/pkg/requirements.pip @@ -3,6 +3,9 @@ simplejson u1db scrypt pycryptopp +cchardet +taskthread +zope.proxy # # leap deps @@ -21,5 +24,3 @@ oauth # pysqlite should not be a dep, see #2945 pysqlite -cchardet -taskthread diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py index 5f1d1a98..116a59e4 100644 --- a/client/src/leap/soledad/client/__init__.py +++ b/client/src/leap/soledad/client/__init__.py @@ -324,7 +324,7 @@ class Soledad(object): self._bootstrap() # might raise BootstrapSequenceError() # initialize syncing queue encryption pool - self._sync_pool = SyncEncrypterPool(self._crypto, self._sync_db) + self._sync_enc_pool = SyncEncrypterPool(self._crypto, self._sync_db) self._sync_watcher = TimerTask(self._encrypt_syncing_docs, delay=10) self._sync_watcher.start() @@ -1145,7 +1145,7 @@ class Soledad(object): if self._db: return self._db.resolve_doc(doc, conflicted_doc_revs) - def sync(self): + def sync(self, decrypt_inline=False): """ Synchronize the local encrypted replica with a remote replica. @@ -1155,11 +1155,15 @@ class Soledad(object): :param url: the url of the target replica to sync with :type url: str - :return: the local generation before the synchronisation was - performed. + :param decrypt_inline: Whether to do the decryption of received + messages inline or not. + :type decrypt_inline: bool + + :return: The local generation before the synchronisation was + performed. :rtype: str """ - #return + print "SYNC: inline? ", decrypt_inline local_gen = None if self._db: # acquire lock before attempt to sync @@ -1176,8 +1180,9 @@ class Soledad(object): local_gen = self._db.sync( urlparse.urljoin( self.server_url, 'user-%s' % self._uuid), - creds=self._creds, autocreate=True) - #signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) + creds=self._creds, autocreate=True, + decrypt_inline=decrypt_inline) + signal(SOLEDAD_DONE_DATA_SYNC, self._uuid) except Exception as exc: logger.error("error during soledad sync") logger.exception(exc) @@ -1388,7 +1393,7 @@ class Soledad(object): return self._passphrase.encode('utf-8') # - # Symmetric encryption / decryption + # Symmetric encryption of syncing docs # def _encrypt_syncing_docs(self): @@ -1396,6 +1401,8 @@ class Soledad(object): Process the syncing queue and send the documents there to be encrypted in the sync db. They will be read by the SoledadSyncTarget during the sync_exchange. + + Called periodical from the TimerTask self._sync_watcher. """ lock = self.encrypting_lock # optional wait flag used to avoid blocking @@ -1406,7 +1413,7 @@ class Soledad(object): try: while not queue.empty(): doc = queue.get_nowait() - self._sync_pool.encrypt_doc(doc) + self._sync_enc_pool.encrypt_doc(doc) except Exception as exc: logger.error("Error while encrypting docs to sync") logger.exception(exc) diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py index 2ada4937..6d1fab37 100644 --- a/client/src/leap/soledad/client/crypto.py +++ b/client/src/leap/soledad/client/crypto.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # crypto.py -# Copyright (C) 2013,2014 LEAP +# Copyright (C) 2013, 2014 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,12 +24,15 @@ import hashlib import json import logging import multiprocessing +import threading from pycryptopp.cipher.aes import AES from pycryptopp.cipher.xsalsa20 import XSalsa20 +from zope.proxy import sameProxiedObjects from leap.soledad.common import soledad_assert from leap.soledad.common import soledad_assert_type +from leap.soledad.common.document import SoledadDocument from leap.soledad.common.crypto import ( @@ -346,14 +349,13 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret): }) -# XXX change to docstr... -def decrypt_doc(crypto, doc): +def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret): """ Decrypt C{doc}'s content. Return the JSON string representation of the document's decrypted content. - The content of the document should have the following structure: + The passed doc_dict argument should have the following structure: { ENC_JSON_KEY: '<enc_blob>', @@ -369,52 +371,67 @@ def decrypt_doc(crypto, doc): EncryptionSchemes.SYMKEY and C{enc_method} is EncryptionMethods.AES_256_CTR. - :param crypto: A SoledadCryto instance to perform the encryption. - :type crypto: leap.soledad.crypto.SoledadCrypto - :param doc: The document to be decrypted. - :type doc: SoledadDocument + :param doc_dict: The content of the document to be decrypted. + :type doc_dict: dict + + :param doc_id: The document id. + :type doc_id: str + + :param doc_rev: The document revision. + :type doc_rev: str + + :param key: The key used to encrypt ``data`` (must be 256 bits long). + :type key: str + + :param secret: + :type secret: :return: The JSON serialization of the decrypted content. :rtype: str """ - soledad_assert(doc.is_tombstone() is False) - soledad_assert(ENC_JSON_KEY in doc.content) - soledad_assert(ENC_SCHEME_KEY in doc.content) - soledad_assert(ENC_METHOD_KEY in doc.content) - soledad_assert(MAC_KEY in doc.content) - soledad_assert(MAC_METHOD_KEY in doc.content) + # TODO where should we move these assertions, now that we're passed the + # string? + #soledad_assert(doc.is_tombstone() is False) + + soledad_assert(ENC_JSON_KEY in doc_dict) + soledad_assert(ENC_SCHEME_KEY in doc_dict) + soledad_assert(ENC_METHOD_KEY in doc_dict) + soledad_assert(MAC_KEY in doc_dict) + soledad_assert(MAC_METHOD_KEY in doc_dict) + # verify MAC ciphertext = binascii.a2b_hex( # content is stored as hex. - doc.content[ENC_JSON_KEY]) + doc_dict[ENC_JSON_KEY]) mac = mac_doc( - doc.doc_id, doc.rev, + doc_id, doc_rev, ciphertext, - doc.content[MAC_METHOD_KEY], crypto.secret) + doc_dict[MAC_METHOD_KEY], secret) # we compare mac's hashes to avoid possible timing attacks that might # exploit python's builtin comparison operator behaviour, which fails # immediatelly when non-matching bytes are found. doc_mac_hash = hashlib.sha256( binascii.a2b_hex( # the mac is stored as hex - doc.content[MAC_KEY])).digest() + doc_dict[MAC_KEY])).digest() calculated_mac_hash = hashlib.sha256(mac).digest() if doc_mac_hash != calculated_mac_hash: raise WrongMac('Could not authenticate document\'s contents.') # decrypt doc's content - enc_scheme = doc.content[ENC_SCHEME_KEY] + enc_scheme = doc_dict[ENC_SCHEME_KEY] plainjson = None if enc_scheme == EncryptionSchemes.SYMKEY: - enc_method = doc.content[ENC_METHOD_KEY] + enc_method = doc_dict[ENC_METHOD_KEY] if enc_method == EncryptionMethods.AES_256_CTR: - soledad_assert(ENC_IV_KEY in doc.content) - plainjson = crypto.decrypt_sym( - ciphertext, - crypto.doc_passphrase(doc.doc_id), + soledad_assert(ENC_IV_KEY in doc_dict) + plainjson = decrypt_sym( + ciphertext, key, method=enc_method, - iv=doc.content[ENC_IV_KEY]) + iv=doc_dict[ENC_IV_KEY]) else: raise UnknownEncryptionMethod(enc_method) else: raise UnknownEncryptionScheme(enc_scheme) + + print "PLAIN: ", plainjson return plainjson @@ -451,6 +468,9 @@ class SyncEncryptDecryptPool(object): :param sync_db: a database connection handle :type sync_db: handle + + :param insert_doc_cb: Optional callback for inserting doc. + :type insert_doc_cb: callable """ self._pool = multiprocessing.Pool(self.WORKERS) self._crypto = crypto @@ -492,9 +512,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): def encrypt_doc_cb(self, result): doc_id, doc_rev, content = result - self.insert_encrypted_doc(doc_id, doc_rev, content) + self.insert_encrypted_local_doc(doc_id, doc_rev, content) - def insert_encrypted_doc(self, doc_id, doc_rev, content): + def insert_encrypted_local_doc(self, doc_id, doc_rev, content): """ Insert the contents of the encrypted doc into the local sync database. @@ -512,19 +532,216 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): self._sync_db.commit() +def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret): + decrypted_content = decrypt_doc_dict( + content, doc_id, doc_rev, key, secret) + return doc_id, doc_rev, decrypted_content, gen, trans_id + + +def get_insertable_docs_by_gen(expected, got): + """ + Return a list of documents ready to be inserted. This list is computed + by aligning the expected list with the already gotten docs, and returning + the maximum number of docs that can be processed in the expected order + before finding a gap. + + :param expected: A list of generations to be inserted. + :type expected: list + + :param got: A dictionary whose values are the docs to be inserted. + :type got: dict + """ + ordered = [got.get(i) for i in expected] + if None in ordered: + return ordered[:ordered.index(None)] + else: + return ordered + + class SyncDecrypterPool(SyncEncryptDecryptPool): """ Pool of workers that spawn subprocesses to execute the symmetric decryption of documents that were received. + + The decryption of the received documents is done in two steps: + + 1. All the encrypted docs are collected, together with their generation + and transaction-id + 2. The docs are enqueued for decryption. When completed, they are + inserted following the generation order. """ WORKERS = 10 TABLE_NAME = "docs_received" FIELD_NAMES = "doc_id, rev, content, gen, trans_id" - def decrypt_doc(self, doc_id, rev): + write_encrypted_lock = threading.Lock() + + def __init__(self, *args, **kwargs): + """ + Initialize the decrypter pool, and setup a dict for putting the + results of the decrypted docs until they are picked by the insert + routine that gets them in order. + """ + self._insert_doc_cb = kwargs.pop("insert_doc_cb") + SyncEncryptDecryptPool.__init__(self, *args) + self.decrypted_docs = {} + + def insert_encrypted_received_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert a received message with encrypted content, to be decrypted later + on. + """ + docstr = json.dumps(content) + c = self._sync_db.cursor() + sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % ( + self.TABLE_NAME,) + c.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id)) + self._sync_db.commit() + + def delete_encrypted_received_doc(self, doc_id, doc_rev): + """ + Delete a encrypted received doc after it was inserted into the local + db. + + :param doc_id: Document ID. + :type doc_id: str + :param doc_rev: Document revision. + :type doc_rev: str + """ + c = self._sync_db.cursor() + sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + c.execute(sql_del, (doc_id, doc_rev)) + self._sync_db.commit() + + def decrypt_doc(self, doc_id, rev, source_replica_uid): """ Symmetrically decrypt a document. - :param doc: The document with contents to be encrypted. - :type doc: SoledadDocument + :param doc_id: The ID for the document with contents to be encrypted. + :type doc: str + :param rev: The revision of the document. + :type rev: str + :param source_replica_uid: + :type source_replica_uid: str + """ + self.source_replica_uid = source_replica_uid + if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid), + None): + print self._insert_doc_cb + logger.warning("No insert_doc callback, skipping decryption.") + return + + # XXX move to get_doc function... + c = self._sync_db.cursor() + sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % ( + self.TABLE_NAME,) + c.execute(sql, (doc_id, rev)) + res = c.fetchone() + if res is None: + logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev)) + return + + doc_id, rev, docstr, gen, trans_id = res + content = json.loads(docstr) + key = self._crypto.doc_passphrase(doc_id) + secret = self._crypto.secret + + args = doc_id, rev, content, gen, trans_id, key, secret + + try: + self._pool.apply_async(decrypt_doc_task, args, + callback=self.decrypt_doc_cb) + except Exception as exc: + logger.exception(exc) + + def decrypt_doc_cb(self, result): + """ + Temporarily store the decryption result in a dictionary where it will + be picked by process_decrypted. + + :param result: the result of the decryption routine. + :type result: tuple + """ + doc_id, rev, content, gen, trans_id = result + self.decrypted_docs[gen] = result + + def get_docs_by_generation(self): + """ + Get all documents in the received table from the sync db, + ordered by generation. + + :return: list of doc_id, rev, generation + """ + c = self._sync_db.cursor() + sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % ( + self.TABLE_NAME,) + c.execute(sql) + return c.fetchall() + + def count_received_encrypted_docs(self): + """ + Count how many documents we have in the table for received and + encrypted docs. + + :return: The count of documents. + :rtype: int + """ + c = self._sync_db.cursor() + sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,) + c.execute(sql) + res = c.fetchone() + print "res" + if res is not None: + print ">>>>>>>>>> GOT %s received encrypted docs" % res[0] + return res[0] + else: + return 0 + + def decrypt_received_docs(self, source_replica_uid): """ + Get all the encrypted documents from the sync database and dispatch a + decrypt worker to decrypt each one of them. + """ + docs_by_generation = self.get_docs_by_generation() + for doc_id, rev, gen in docs_by_generation: + self.decrypt_doc(doc_id, rev, source_replica_uid) + + def process_decrypted(self): + """ + Process the already decrypted documents, and insert as many documents + as can be taken from the expected order without finding a gap. + """ + # Acquire the lock to avoid processing while we're still + # getting data from the syncing stream, to avoid InvalidGeneration + # problems. + with self.write_encrypted_lock: + docs = self.get_docs_by_generation() + expected = [gen for doc_id, rev, gen in docs] + docs_to_insert = get_insertable_docs_by_gen( + expected, self.decrypted_docs) + for doc_fields in docs_to_insert: + self.insert_decrypted_local_doc(*doc_fields) + + def insert_decrypted_local_doc(self, doc_id, doc_rev, content, + gen, trans_id): + """ + Insert the decrypted document into the local sqlcipher database. + Makes use of the passed callback `return_doc_cb` passed to the caller + by u1db sync. + """ + print "TRY TO INSERT GEN --->", gen + # could pass source_replica in params for callback chain + insert_fun = self._insert_doc_cb[self.source_replica_uid] + try: + doc = SoledadDocument(doc_id, doc_rev, content) + insert_fun(doc, int(gen), trans_id) + except Exception as exc: + logger.error("Error while inserting decrypted doc into local db") + logger.exception(exc) + else: + # If no errors found, remove it from the local temporary dict + # and from the received database. + self.decrypted_docs.pop(gen) + self.delete_encrypted_received_doc(doc_id, doc_rev) diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index c7cf79a2..4e18847e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -54,6 +54,7 @@ from u1db.sync import Synchronizer from u1db import errors as u1db_errors from leap.soledad.client.target import SoledadSyncTarget +from leap.soledad.client.target import PendingReceivedDocsSyncError from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) @@ -339,7 +340,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): crypto=crypto, raw_key=raw_key, cipher=cipher, kdf_iter=kdf_iter, cipher_page_size=cipher_page_size) - def sync(self, url, creds=None, autocreate=True): + def sync(self, url, creds=None, autocreate=True, decrypt_inline=False): """ Synchronize documents with remote replica exposed at url. @@ -355,12 +356,21 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): :rtype: int """ print "***********************" - print "SQLCIPHER: sync started" + print "SQLCIPHER: sync started. inline?", decrypt_inline if not self.syncer: self._create_syncer(url, creds=creds) + old_decrypt_inline = self.syncer.sync_target.decrypt_inline + print "SETTING TARGET decrypt_inline to ", decrypt_inline + self.syncer.sync_target.set_decrypt_inline(decrypt_inline) + try: res = self.syncer.sync(autocreate=autocreate) + + except PendingReceivedDocsSyncError: + logger.warning("Local sync db is not clear, skipping sync...") + return + except httplib.CannotSendRequest: # raised when you reuse httplib.HTTP object for new request # while you havn't called its getresponse() @@ -371,10 +381,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): self._syncer = None self._create_syncer(url, creds=creds) print "SQLCIPHER: syncer created, about to sync..." + print "SETTING TARGET decrypt_inline to ", decrypt_inline + self.syncer.sync_target.set_decrypt_inline(decrypt_inline) res = self.syncer.sync(autocreate=autocreate) except Exception: logger.error("error SQLITE sync") raise + finally: + # restore the original decrypt inline behav + self.syncer.sync_target.set_decrypt_inline(old_decrypt_inline) + print "SQLCIPHER: sync DONE" return res diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index dc2a0420..9e65b2df 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -22,30 +22,33 @@ import cStringIO import gzip import logging import os +import re import sqlite3 import urllib -import simplejson as json +from collections import defaultdict from time import sleep +import simplejson as json +from taskthread import TimerTask from u1db.remote import utils, http_errors from u1db.errors import BrokenSyncStream from u1db import errors from u1db.remote.http_target import HTTPSyncTarget from u1db.remote.http_client import _encode_query_parameter +from zope.proxy import ProxyBase +from zope.proxy import sameProxiedObjects, setProxiedObject from leap.soledad.common.document import SoledadDocument from leap.soledad.client.auth import TokenBasedAuth -from leap.soledad.client.crypto import is_symmetrically_encrypted, decrypt_doc +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.crypto import encrypt_docstr, decrypt_doc_dict +from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool from leap.common.check import leap_check logger = logging.getLogger(__name__) -# -# Exceptions -# - def _gunzip(data): """ @@ -65,10 +68,14 @@ def _gunzip(data): return data +class PendingReceivedDocsSyncError(Exception): + pass + # # SoledadSyncTarget # + class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): """ A SyncTarget that encrypts data before sending and decrypts data after @@ -80,6 +87,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): written to the main database. """ + # will later keep a reference to the insert-doc callback + # passed to sync_exchange + _insert_doc_cb = defaultdict(lambda: ProxyBase(None)) + # # Modified HTTPSyncTarget methods. # @@ -109,10 +120,32 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto + print "URL : ", url + self.source_replica_uid = re.findall("user-([0-9a-fA-F]+)", url)[0] + print "uid -->", self.source_replica_uid + self._sync_db = None if sync_db_path is not None: self._init_sync_db(sync_db_path) + # whether to bypass the received messages decryption deferral + self._decrypt_inline = False + + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, self._sync_db, + insert_doc_cb=self._insert_doc_cb) + self._sync_watcher = TimerTask( + self._decrypt_syncing_received_docs, delay=10) + self._sync_watcher.start() + + def set_decrypt_inline(self, value): + self._decrypt_inline = value + + @property + def decrypt_inline(self): + return self._decrypt_inline + @staticmethod def connect(url, crypto=None): return SoledadSyncTarget(url, crypto=crypto) @@ -151,31 +184,57 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise BrokenSyncStream data = parts[1:-1] comma = False + + queue_for_decrypt = (not self.decrypt_inline or + self._sync_db is None) + if queue_for_decrypt: + self._sync_decr_pool.write_encrypted_lock.acquire() if data: line, comma = utils.check_and_strip_comma(data[0]) res = json.loads(line) if ensure_callback and 'replica_uid' in res: ensure_callback(res['replica_uid']) + + # XXX check that writing_incoming lock is not acquired --------- + for entry in data[1:]: if not comma: # missing in between comma raise BrokenSyncStream line, comma = utils.check_and_strip_comma(entry) entry = json.loads(line) + gen, trans_id = entry['gen'], entry['trans_id'] #------------------------------------------------------------- # symmetric decryption of document's contents #------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. + # If arriving content was symmetrically encrypted, we decrypt + # it. We do it inline if decrypt_inline flag is True or no + # sync_db was defined, otherwise we defer it writing it to the + # received docs table. + doc = SoledadDocument( entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) + + if is_symmetrically_encrypted(doc): + if queue_for_decrypt: + print "ENQUEUING DECRYPT -----------------------" + self._save_encrypted_received_doc(doc, gen, trans_id) + else: + print "INLINE DECRYPT -------------------------" + # force inline decrypt, or no-db fallback, for tests + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + doc.set_json(decrypt_doc_dict( + doc.content, doc.doc_id, doc.rev, + key, secret)) + # XXX should release lock in the decrypt pool + #------------------------------------------------------------- # end of symmetric decryption #------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) + if not queue_for_decrypt: + return_doc_cb(doc, gen, trans_id) + if queue_for_decrypt: + self._sync_decr_pool.write_encrypted_lock.release() if parts[-1] != ']': try: partdic = json.loads(parts[-1]) @@ -304,6 +363,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ + self.source_replica_uid = source_replica_uid + print "SETTING SOURCE REPLICA UID to", source_replica_uid + # let the decrypter pool access the passed callback to insert docs + print "SETTING PROXY TO ------------>", return_doc_cb + setProxiedObject(self._insert_doc_cb[source_replica_uid], + return_doc_cb) + + if not self.clear_to_sync(): + raise PendingReceivedDocsSyncError + self._ensure_connection() if self._trace_hook: # for tests self._trace_hook('sync_exchange') @@ -340,9 +409,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if not doc.is_tombstone(): if self._sync_db is None: # fallback case, for tests + key = self._crypto.doc_passphrase(doc.doc_id) + secret = self._crypto.secret + doc_json = encrypt_docstr( json.dumps(doc.get_json()), - doc.doc_id, doc.rev, self._crypto.secret) + doc.doc_id, doc.rev, key, secret) else: try: doc_json = self.get_encrypted_doc_from_db( @@ -378,6 +450,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self.delete_encrypted_docs_from_db(synced) data = None + print "SYNC EXCHANGE FINISHED: new generation -> %s" % res['new_generation'] return res['new_generation'], res['new_transaction_id'] # @@ -439,9 +512,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :param doc_rev: The document revision :type doc_rev: str """ + encr = SyncEncrypterPool c = self._sync_db.cursor() - # XXX interpolate table name - sql = ("SELECT content FROM docs_tosync WHERE doc_id=? and rev=?") + sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) c.execute(sql, (doc_id, doc_rev)) res = c.fetchall() if len(res) != 0: @@ -456,10 +530,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): to be deleted. :type docs_ids: any iterable of tuples of str """ + encr = SyncEncrypterPool c = self._sync_db.cursor() for doc_id, doc_rev in docs_ids: - # XXX interpolate table name - sql = ("DELETE FROM docs_tosync " - "WHERE doc_id=? and rev=?") + sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % ( + encr.TABLE_NAME,)) c.execute(sql, (doc_id, doc_rev)) self._sync_db.commit() + + def _save_encrypted_received_doc(self, doc, gen, trans_id): + """ + Save an incoming document into the received docs table in the sync db. + + :param doc: The document to save. + :type doc: SoledadDocument + :param gen: The generation. + :type gen: str + :param trans_id: Transacion id. + :type gen: str + """ + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id) + + # + # Symmetric decryption of syncing docs + # + + def clear_to_sync(self): + """ + Return True if sync can proceed (ie, the received db table is empty). + :rtype: bool + """ + return self._sync_decr_pool.count_received_encrypted_docs() == 0 + + def _decrypt_syncing_received_docs(self): + """ + Decrypt the documents received from remote replica and insert them + into the local one. + + Called periodically from TimerTask self._sync_watcher. + """ + if sameProxiedObjects(self._insert_doc_cb.get(self.source_replica_uid), + None): + logger.warning("No insert_doc callback, skipping decryption.") + return + + decrypter = self._sync_decr_pool + decrypter.decrypt_received_docs(self.source_replica_uid) + decrypter.process_decrypted() |