summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2014-03-14 02:09:40 -0400
committerKali Kaneko <kali@leap.se>2014-03-17 12:37:35 -0400
commitd2d3a243b6da313a54c8c498ffcd3f065721ad5a (patch)
tree2e34698acdbf656e759c945bca96c3e04ca30d6a
parent1a60f3616efef904917dd77a12170912defc7637 (diff)
move symmetric decryption of docs to be db-based toofeature/enc-sync-transitional-db
-rw-r--r--client/pkg/requirements.pip5
-rw-r--r--client/src/leap/soledad/client/__init__.py25
-rw-r--r--client/src/leap/soledad/client/crypto.py277
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py20
-rw-r--r--client/src/leap/soledad/client/target.py153
5 files changed, 418 insertions, 62 deletions
diff --git a/client/pkg/requirements.pip b/client/pkg/requirements.pip
index ff1b4f35..6f2954ab 100644
--- a/client/pkg/requirements.pip
+++ b/client/pkg/requirements.pip
@@ -3,6 +3,9 @@ simplejson
u1db
scrypt
pycryptopp
+cchardet
+taskthread
+zope.proxy
#
# leap deps
@@ -21,5 +24,3 @@ oauth
# pysqlite should not be a dep, see #2945
pysqlite
-cchardet
-taskthread
diff --git a/client/src/leap/soledad/client/__init__.py b/client/src/leap/soledad/client/__init__.py
index 5f1d1a98..116a59e4 100644
--- a/client/src/leap/soledad/client/__init__.py
+++ b/client/src/leap/soledad/client/__init__.py
@@ -324,7 +324,7 @@ class Soledad(object):
self._bootstrap() # might raise BootstrapSequenceError()
# initialize syncing queue encryption pool
- self._sync_pool = SyncEncrypterPool(self._crypto, self._sync_db)
+ self._sync_enc_pool = SyncEncrypterPool(self._crypto, self._sync_db)
self._sync_watcher = TimerTask(self._encrypt_syncing_docs, delay=10)
self._sync_watcher.start()
@@ -1145,7 +1145,7 @@ class Soledad(object):
if self._db:
return self._db.resolve_doc(doc, conflicted_doc_revs)
- def sync(self):
+ def sync(self, decrypt_inline=False):
"""
Synchronize the local encrypted replica with a remote replica.
@@ -1155,11 +1155,15 @@ class Soledad(object):
:param url: the url of the target replica to sync with
:type url: str
- :return: the local generation before the synchronisation was
- performed.
+ :param decrypt_inline: Whether to do the decryption of received
+ messages inline or not.
+ :type decrypt_inline: bool
+
+ :return: The local generation before the synchronisation was
+ performed.
:rtype: str
"""
- #return
+ print "SYNC: inline? ", decrypt_inline
local_gen = None
if self._db:
# acquire lock before attempt to sync
@@ -1176,8 +1180,9 @@ class Soledad(object):
local_gen = self._db.sync(
urlparse.urljoin(
self.server_url, 'user-%s' % self._uuid),
- creds=self._creds, autocreate=True)
- #signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
+ creds=self._creds, autocreate=True,
+ decrypt_inline=decrypt_inline)
+ signal(SOLEDAD_DONE_DATA_SYNC, self._uuid)
except Exception as exc:
logger.error("error during soledad sync")
logger.exception(exc)
@@ -1388,7 +1393,7 @@ class Soledad(object):
return self._passphrase.encode('utf-8')
#
- # Symmetric encryption / decryption
+ # Symmetric encryption of syncing docs
#
def _encrypt_syncing_docs(self):
@@ -1396,6 +1401,8 @@ class Soledad(object):
Process the syncing queue and send the documents there
to be encrypted in the sync db. They will be read by the
SoledadSyncTarget during the sync_exchange.
+
+ Called periodical from the TimerTask self._sync_watcher.
"""
lock = self.encrypting_lock
# optional wait flag used to avoid blocking
@@ -1406,7 +1413,7 @@ class Soledad(object):
try:
while not queue.empty():
doc = queue.get_nowait()
- self._sync_pool.encrypt_doc(doc)
+ self._sync_enc_pool.encrypt_doc(doc)
except Exception as exc:
logger.error("Error while encrypting docs to sync")
logger.exception(exc)
diff --git a/client/src/leap/soledad/client/crypto.py b/client/src/leap/soledad/client/crypto.py
index 2ada4937..6d1fab37 100644
--- a/client/src/leap/soledad/client/crypto.py
+++ b/client/src/leap/soledad/client/crypto.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# crypto.py
-# Copyright (C) 2013,2014 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -24,12 +24,15 @@ import hashlib
import json
import logging
import multiprocessing
+import threading
from pycryptopp.cipher.aes import AES
from pycryptopp.cipher.xsalsa20 import XSalsa20
+from zope.proxy import sameProxiedObjects
from leap.soledad.common import soledad_assert
from leap.soledad.common import soledad_assert_type
+from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.crypto import (
@@ -346,14 +349,13 @@ def encrypt_docstr(docstr, doc_id, doc_rev, key, secret):
})
-# XXX change to docstr...
-def decrypt_doc(crypto, doc):
+def decrypt_doc_dict(doc_dict, doc_id, doc_rev, key, secret):
"""
Decrypt C{doc}'s content.
Return the JSON string representation of the document's decrypted content.
- The content of the document should have the following structure:
+ The passed doc_dict argument should have the following structure:
{
ENC_JSON_KEY: '<enc_blob>',
@@ -369,52 +371,67 @@ def decrypt_doc(crypto, doc):
EncryptionSchemes.SYMKEY and C{enc_method} is
EncryptionMethods.AES_256_CTR.
- :param crypto: A SoledadCryto instance to perform the encryption.
- :type crypto: leap.soledad.crypto.SoledadCrypto
- :param doc: The document to be decrypted.
- :type doc: SoledadDocument
+ :param doc_dict: The content of the document to be decrypted.
+ :type doc_dict: dict
+
+ :param doc_id: The document id.
+ :type doc_id: str
+
+ :param doc_rev: The document revision.
+ :type doc_rev: str
+
+ :param key: The key used to encrypt ``data`` (must be 256 bits long).
+ :type key: str
+
+ :param secret:
+ :type secret:
:return: The JSON serialization of the decrypted content.
:rtype: str
"""
- soledad_assert(doc.is_tombstone() is False)
- soledad_assert(ENC_JSON_KEY in doc.content)
- soledad_assert(ENC_SCHEME_KEY in doc.content)
- soledad_assert(ENC_METHOD_KEY in doc.content)
- soledad_assert(MAC_KEY in doc.content)
- soledad_assert(MAC_METHOD_KEY in doc.content)
+ # TODO where should we move these assertions, now that we're passed the
+ # string?
+ #soledad_assert(doc.is_tombstone() is False)
+
+ soledad_assert(ENC_JSON_KEY in doc_dict)
+ soledad_assert(ENC_SCHEME_KEY in doc_dict)
+ soledad_assert(ENC_METHOD_KEY in doc_dict)
+ soledad_assert(MAC_KEY in doc_dict)
+ soledad_assert(MAC_METHOD_KEY in doc_dict)
+
# verify MAC
ciphertext = binascii.a2b_hex( # content is stored as hex.
- doc.content[ENC_JSON_KEY])
+ doc_dict[ENC_JSON_KEY])
mac = mac_doc(
- doc.doc_id, doc.rev,
+ doc_id, doc_rev,
ciphertext,
- doc.content[MAC_METHOD_KEY], crypto.secret)
+ doc_dict[MAC_METHOD_KEY], secret)
# we compare mac's hashes to avoid possible timing attacks that might
# exploit python's builtin comparison operator behaviour, which fails
# immediatelly when non-matching bytes are found.
doc_mac_hash = hashlib.sha256(
binascii.a2b_hex( # the mac is stored as hex
- doc.content[MAC_KEY])).digest()
+ doc_dict[MAC_KEY])).digest()
calculated_mac_hash = hashlib.sha256(mac).digest()
if doc_mac_hash != calculated_mac_hash:
raise WrongMac('Could not authenticate document\'s contents.')
# decrypt doc's content
- enc_scheme = doc.content[ENC_SCHEME_KEY]
+ enc_scheme = doc_dict[ENC_SCHEME_KEY]
plainjson = None
if enc_scheme == EncryptionSchemes.SYMKEY:
- enc_method = doc.content[ENC_METHOD_KEY]
+ enc_method = doc_dict[ENC_METHOD_KEY]
if enc_method == EncryptionMethods.AES_256_CTR:
- soledad_assert(ENC_IV_KEY in doc.content)
- plainjson = crypto.decrypt_sym(
- ciphertext,
- crypto.doc_passphrase(doc.doc_id),
+ soledad_assert(ENC_IV_KEY in doc_dict)
+ plainjson = decrypt_sym(
+ ciphertext, key,
method=enc_method,
- iv=doc.content[ENC_IV_KEY])
+ iv=doc_dict[ENC_IV_KEY])
else:
raise UnknownEncryptionMethod(enc_method)
else:
raise UnknownEncryptionScheme(enc_scheme)
+
+ print "PLAIN: ", plainjson
return plainjson
@@ -451,6 +468,9 @@ class SyncEncryptDecryptPool(object):
:param sync_db: a database connection handle
:type sync_db: handle
+
+ :param insert_doc_cb: Optional callback for inserting doc.
+ :type insert_doc_cb: callable
"""
self._pool = multiprocessing.Pool(self.WORKERS)
self._crypto = crypto
@@ -492,9 +512,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
def encrypt_doc_cb(self, result):
doc_id, doc_rev, content = result
- self.insert_encrypted_doc(doc_id, doc_rev, content)
+ self.insert_encrypted_local_doc(doc_id, doc_rev, content)
- def insert_encrypted_doc(self, doc_id, doc_rev, content):
+ def insert_encrypted_local_doc(self, doc_id, doc_rev, content):
"""
Insert the contents of the encrypted doc into the local sync
database.
@@ -512,19 +532,216 @@ class SyncEncrypterPool(SyncEncryptDecryptPool):
self._sync_db.commit()
+def decrypt_doc_task(doc_id, doc_rev, content, gen, trans_id, key, secret):
+ decrypted_content = decrypt_doc_dict(
+ content, doc_id, doc_rev, key, secret)
+ return doc_id, doc_rev, decrypted_content, gen, trans_id
+
+
+def get_insertable_docs_by_gen(expected, got):
+ """
+ Return a list of documents ready to be inserted. This list is computed
+ by aligning the expected list with the already gotten docs, and returning
+ the maximum number of docs that can be processed in the expected order
+ before finding a gap.
+
+ :param expected: A list of generations to be inserted.
+ :type expected: list
+
+ :param got: A dictionary whose values are the docs to be inserted.
+ :type got: dict
+ """
+ ordered = [got.get(i) for i in expected]
+ if None in ordered:
+ return ordered[:ordered.index(None)]
+ else:
+ return ordered
+
+
class SyncDecrypterPool(SyncEncryptDecryptPool):
"""
Pool of workers that spawn subprocesses to execute the symmetric decryption
of documents that were received.
+
+ The decryption of the received documents is done in two steps:
+
+ 1. All the encrypted docs are collected, together with their generation
+ and transaction-id
+ 2. The docs are enqueued for decryption. When completed, they are
+ inserted following the generation order.
"""
WORKERS = 10
TABLE_NAME = "docs_received"
FIELD_NAMES = "doc_id, rev, content, gen, trans_id"
- def decrypt_doc(self, doc_id, rev):
+ write_encrypted_lock = threading.Lock()
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initialize the decrypter pool, and setup a dict for putting the
+ results of the decrypted docs until they are picked by the insert
+ routine that gets them in order.
+ """
+ self._insert_doc_cb = kwargs.pop("insert_doc_cb")
+ SyncEncryptDecryptPool.__init__(self, *args)
+ self.decrypted_docs = {}
+
+ def insert_encrypted_received_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert a received message with encrypted content, to be decrypted later
+ on.
+ """
+ docstr = json.dumps(content)
+ c = self._sync_db.cursor()
+ sql_ins = "INSERT INTO '%s' VALUES (?, ?, ?, ?, ?)" % (
+ self.TABLE_NAME,)
+ c.execute(sql_ins, (doc_id, doc_rev, docstr, gen, trans_id))
+ self._sync_db.commit()
+
+ def delete_encrypted_received_doc(self, doc_id, doc_rev):
+ """
+ Delete a encrypted received doc after it was inserted into the local
+ db.
+
+ :param doc_id: Document ID.
+ :type doc_id: str
+ :param doc_rev: Document revision.
+ :type doc_rev: str
+ """
+ c = self._sync_db.cursor()
+ sql_del = "DELETE FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ c.execute(sql_del, (doc_id, doc_rev))
+ self._sync_db.commit()
+
+ def decrypt_doc(self, doc_id, rev, source_replica_uid):
"""
Symmetrically decrypt a document.
- :param doc: The document with contents to be encrypted.
- :type doc: SoledadDocument
+ :param doc_id: The ID for the document with contents to be encrypted.
+ :type doc: str
+ :param rev: The revision of the document.
+ :type rev: str
+ :param source_replica_uid:
+ :type source_replica_uid: str
+ """
+ self.source_replica_uid = source_replica_uid
+ if sameProxiedObjects(self._insert_doc_cb.get(source_replica_uid),
+ None):
+ print self._insert_doc_cb
+ logger.warning("No insert_doc callback, skipping decryption.")
+ return
+
+ # XXX move to get_doc function...
+ c = self._sync_db.cursor()
+ sql = "SELECT * FROM '%s' WHERE doc_id=? AND rev=?" % (
+ self.TABLE_NAME,)
+ c.execute(sql, (doc_id, rev))
+ res = c.fetchone()
+ if res is None:
+ logger.debug("Doc %s:%s does not exist in sync db" % (doc_id, rev))
+ return
+
+ doc_id, rev, docstr, gen, trans_id = res
+ content = json.loads(docstr)
+ key = self._crypto.doc_passphrase(doc_id)
+ secret = self._crypto.secret
+
+ args = doc_id, rev, content, gen, trans_id, key, secret
+
+ try:
+ self._pool.apply_async(decrypt_doc_task, args,
+ callback=self.decrypt_doc_cb)
+ except Exception as exc:
+ logger.exception(exc)
+
+ def decrypt_doc_cb(self, result):
+ """
+ Temporarily store the decryption result in a dictionary where it will
+ be picked by process_decrypted.
+
+ :param result: the result of the decryption routine.
+ :type result: tuple
+ """
+ doc_id, rev, content, gen, trans_id = result
+ self.decrypted_docs[gen] = result
+
+ def get_docs_by_generation(self):
+ """
+ Get all documents in the received table from the sync db,
+ ordered by generation.
+
+ :return: list of doc_id, rev, generation
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT doc_id, rev, gen FROM %s ORDER BY gen" % (
+ self.TABLE_NAME,)
+ c.execute(sql)
+ return c.fetchall()
+
+ def count_received_encrypted_docs(self):
+ """
+ Count how many documents we have in the table for received and
+ encrypted docs.
+
+ :return: The count of documents.
+ :rtype: int
+ """
+ c = self._sync_db.cursor()
+ sql = "SELECT COUNT(*) FROM %s" % (self.TABLE_NAME,)
+ c.execute(sql)
+ res = c.fetchone()
+ print "res"
+ if res is not None:
+ print ">>>>>>>>>> GOT %s received encrypted docs" % res[0]
+ return res[0]
+ else:
+ return 0
+
+ def decrypt_received_docs(self, source_replica_uid):
"""
+ Get all the encrypted documents from the sync database and dispatch a
+ decrypt worker to decrypt each one of them.
+ """
+ docs_by_generation = self.get_docs_by_generation()
+ for doc_id, rev, gen in docs_by_generation:
+ self.decrypt_doc(doc_id, rev, source_replica_uid)
+
+ def process_decrypted(self):
+ """
+ Process the already decrypted documents, and insert as many documents
+ as can be taken from the expected order without finding a gap.
+ """
+ # Acquire the lock to avoid processing while we're still
+ # getting data from the syncing stream, to avoid InvalidGeneration
+ # problems.
+ with self.write_encrypted_lock:
+ docs = self.get_docs_by_generation()
+ expected = [gen for doc_id, rev, gen in docs]
+ docs_to_insert = get_insertable_docs_by_gen(
+ expected, self.decrypted_docs)
+ for doc_fields in docs_to_insert:
+ self.insert_decrypted_local_doc(*doc_fields)
+
+ def insert_decrypted_local_doc(self, doc_id, doc_rev, content,
+ gen, trans_id):
+ """
+ Insert the decrypted document into the local sqlcipher database.
+ Makes use of the passed callback `return_doc_cb` passed to the caller
+ by u1db sync.
+ """
+ print "TRY TO INSERT GEN --->", gen
+ # could pass source_replica in params for callback chain
+ insert_fun = self._insert_doc_cb[self.source_replica_uid]
+ try:
+ doc = SoledadDocument(doc_id, doc_rev, content)
+ insert_fun(doc, int(gen), trans_id)
+ except Exception as exc:
+ logger.error("Error while inserting decrypted doc into local db")
+ logger.exception(exc)
+ else:
+ # If no errors found, remove it from the local temporary dict
+ # and from the received database.
+ self.decrypted_docs.pop(gen)
+ self.delete_encrypted_received_doc(doc_id, doc_rev)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index c7cf79a2..4e18847e 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -54,6 +54,7 @@ from u1db.sync import Synchronizer
from u1db import errors as u1db_errors
from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.common.document import SoledadDocument
logger = logging.getLogger(__name__)
@@ -339,7 +340,7 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
crypto=crypto, raw_key=raw_key, cipher=cipher,
kdf_iter=kdf_iter, cipher_page_size=cipher_page_size)
- def sync(self, url, creds=None, autocreate=True):
+ def sync(self, url, creds=None, autocreate=True, decrypt_inline=False):
"""
Synchronize documents with remote replica exposed at url.
@@ -355,12 +356,21 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
:rtype: int
"""
print "***********************"
- print "SQLCIPHER: sync started"
+ print "SQLCIPHER: sync started. inline?", decrypt_inline
if not self.syncer:
self._create_syncer(url, creds=creds)
+ old_decrypt_inline = self.syncer.sync_target.decrypt_inline
+ print "SETTING TARGET decrypt_inline to ", decrypt_inline
+ self.syncer.sync_target.set_decrypt_inline(decrypt_inline)
+
try:
res = self.syncer.sync(autocreate=autocreate)
+
+ except PendingReceivedDocsSyncError:
+ logger.warning("Local sync db is not clear, skipping sync...")
+ return
+
except httplib.CannotSendRequest:
# raised when you reuse httplib.HTTP object for new request
# while you havn't called its getresponse()
@@ -371,10 +381,16 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
self._syncer = None
self._create_syncer(url, creds=creds)
print "SQLCIPHER: syncer created, about to sync..."
+ print "SETTING TARGET decrypt_inline to ", decrypt_inline
+ self.syncer.sync_target.set_decrypt_inline(decrypt_inline)
res = self.syncer.sync(autocreate=autocreate)
except Exception:
logger.error("error SQLITE sync")
raise
+ finally:
+ # restore the original decrypt inline behav
+ self.syncer.sync_target.set_decrypt_inline(old_decrypt_inline)
+
print "SQLCIPHER: sync DONE"
return res
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index dc2a0420..9e65b2df 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -22,30 +22,33 @@ import cStringIO
import gzip
import logging
import os
+import re
import sqlite3
import urllib
-import simplejson as json
+from collections import defaultdict
from time import sleep
+import simplejson as json
+from taskthread import TimerTask
from u1db.remote import utils, http_errors
from u1db.errors import BrokenSyncStream
from u1db import errors
from u1db.remote.http_target import HTTPSyncTarget
from u1db.remote.http_client import _encode_query_parameter
+from zope.proxy import ProxyBase
+from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import is_symmetrically_encrypted, decrypt_doc
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_docstr, decrypt_doc_dict
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.common.check import leap_check
logger = logging.getLogger(__name__)
-#
-# Exceptions
-#
-
def _gunzip(data):
"""
@@ -65,10 +68,14 @@ def _gunzip(data):
return data
+class PendingReceivedDocsSyncError(Exception):
+ pass
+
#
# SoledadSyncTarget
#
+
class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
A SyncTarget that encrypts data before sending and decrypts data after
@@ -80,6 +87,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
written to the main database.
"""
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
#
# Modified HTTPSyncTarget methods.
#
@@ -109,10 +120,32 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
+ print "URL : ", url
+ self.source_replica_uid = re.findall("user-([0-9a-fA-F]+)", url)[0]
+ print "uid -->", self.source_replica_uid
+
self._sync_db = None
if sync_db_path is not None:
self._init_sync_db(sync_db_path)
+ # whether to bypass the received messages decryption deferral
+ self._decrypt_inline = False
+
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto, self._sync_db,
+ insert_doc_cb=self._insert_doc_cb)
+ self._sync_watcher = TimerTask(
+ self._decrypt_syncing_received_docs, delay=10)
+ self._sync_watcher.start()
+
+ def set_decrypt_inline(self, value):
+ self._decrypt_inline = value
+
+ @property
+ def decrypt_inline(self):
+ return self._decrypt_inline
+
@staticmethod
def connect(url, crypto=None):
return SoledadSyncTarget(url, crypto=crypto)
@@ -151,31 +184,57 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise BrokenSyncStream
data = parts[1:-1]
comma = False
+
+ queue_for_decrypt = (not self.decrypt_inline or
+ self._sync_db is None)
+ if queue_for_decrypt:
+ self._sync_decr_pool.write_encrypted_lock.acquire()
if data:
line, comma = utils.check_and_strip_comma(data[0])
res = json.loads(line)
if ensure_callback and 'replica_uid' in res:
ensure_callback(res['replica_uid'])
+
+ # XXX check that writing_incoming lock is not acquired ---------
+
for entry in data[1:]:
if not comma: # missing in between comma
raise BrokenSyncStream
line, comma = utils.check_and_strip_comma(entry)
entry = json.loads(line)
+ gen, trans_id = entry['gen'], entry['trans_id']
#-------------------------------------------------------------
# symmetric decryption of document's contents
#-------------------------------------------------------------
- # if arriving content was symmetrically encrypted, we decrypt
- # it.
+ # If arriving content was symmetrically encrypted, we decrypt
+ # it. We do it inline if decrypt_inline flag is True or no
+ # sync_db was defined, otherwise we defer it writing it to the
+ # received docs table.
+
doc = SoledadDocument(
entry['id'], entry['rev'], entry['content'])
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == \
- EncryptionSchemes.SYMKEY:
- doc.set_json(decrypt_doc(self._crypto, doc))
+
+ if is_symmetrically_encrypted(doc):
+ if queue_for_decrypt:
+ print "ENQUEUING DECRYPT -----------------------"
+ self._save_encrypted_received_doc(doc, gen, trans_id)
+ else:
+ print "INLINE DECRYPT -------------------------"
+ # force inline decrypt, or no-db fallback, for tests
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ doc.set_json(decrypt_doc_dict(
+ doc.content, doc.doc_id, doc.rev,
+ key, secret))
+ # XXX should release lock in the decrypt pool
+
#-------------------------------------------------------------
# end of symmetric decryption
#-------------------------------------------------------------
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
+ if not queue_for_decrypt:
+ return_doc_cb(doc, gen, trans_id)
+ if queue_for_decrypt:
+ self._sync_decr_pool.write_encrypted_lock.release()
if parts[-1] != ']':
try:
partdic = json.loads(parts[-1])
@@ -304,6 +363,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self.source_replica_uid = source_replica_uid
+ print "SETTING SOURCE REPLICA UID to", source_replica_uid
+ # let the decrypter pool access the passed callback to insert docs
+ print "SETTING PROXY TO ------------>", return_doc_cb
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ if not self.clear_to_sync():
+ raise PendingReceivedDocsSyncError
+
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -340,9 +409,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if not doc.is_tombstone():
if self._sync_db is None:
# fallback case, for tests
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+
doc_json = encrypt_docstr(
json.dumps(doc.get_json()),
- doc.doc_id, doc.rev, self._crypto.secret)
+ doc.doc_id, doc.rev, key, secret)
else:
try:
doc_json = self.get_encrypted_doc_from_db(
@@ -378,6 +450,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.delete_encrypted_docs_from_db(synced)
data = None
+ print "SYNC EXCHANGE FINISHED: new generation -> %s" % res['new_generation']
return res['new_generation'], res['new_transaction_id']
#
@@ -439,9 +512,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param doc_rev: The document revision
:type doc_rev: str
"""
+ encr = SyncEncrypterPool
c = self._sync_db.cursor()
- # XXX interpolate table name
- sql = ("SELECT content FROM docs_tosync WHERE doc_id=? and rev=?")
+ sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
c.execute(sql, (doc_id, doc_rev))
res = c.fetchall()
if len(res) != 0:
@@ -456,10 +530,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
to be deleted.
:type docs_ids: any iterable of tuples of str
"""
+ encr = SyncEncrypterPool
c = self._sync_db.cursor()
for doc_id, doc_rev in docs_ids:
- # XXX interpolate table name
- sql = ("DELETE FROM docs_tosync "
- "WHERE doc_id=? and rev=?")
+ sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
c.execute(sql, (doc_id, doc_rev))
self._sync_db.commit()
+
+ def _save_encrypted_received_doc(self, doc, gen, trans_id):
+ """
+ Save an incoming document into the received docs table in the sync db.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ """
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
+
+ def clear_to_sync(self):
+ """
+ Return True if sync can proceed (ie, the received db table is empty).
+ :rtype: bool
+ """
+ return self._sync_decr_pool.count_received_encrypted_docs() == 0
+
+ def _decrypt_syncing_received_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from TimerTask self._sync_watcher.
+ """
+ if sameProxiedObjects(self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ logger.warning("No insert_doc callback, skipping decryption.")
+ return
+
+ decrypter = self._sync_decr_pool
+ decrypter.decrypt_received_docs(self.source_replica_uid)
+ decrypter.process_decrypted()