summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py153
1 files changed, 134 insertions, 19 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index dc2a0420..9e65b2df 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -22,30 +22,33 @@ import cStringIO
import gzip
import logging
import os
+import re
import sqlite3
import urllib
-import simplejson as json
+from collections import defaultdict
from time import sleep
+import simplejson as json
+from taskthread import TimerTask
from u1db.remote import utils, http_errors
from u1db.errors import BrokenSyncStream
from u1db import errors
from u1db.remote.http_target import HTTPSyncTarget
from u1db.remote.http_client import _encode_query_parameter
+from zope.proxy import ProxyBase
+from zope.proxy import sameProxiedObjects, setProxiedObject
from leap.soledad.common.document import SoledadDocument
from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import is_symmetrically_encrypted, decrypt_doc
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_docstr, decrypt_doc_dict
+from leap.soledad.client.crypto import SyncEncrypterPool, SyncDecrypterPool
from leap.common.check import leap_check
logger = logging.getLogger(__name__)
-#
-# Exceptions
-#
-
def _gunzip(data):
"""
@@ -65,10 +68,14 @@ def _gunzip(data):
return data
+class PendingReceivedDocsSyncError(Exception):
+ pass
+
#
# SoledadSyncTarget
#
+
class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
A SyncTarget that encrypts data before sending and decrypts data after
@@ -80,6 +87,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
written to the main database.
"""
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
#
# Modified HTTPSyncTarget methods.
#
@@ -109,10 +120,32 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
+ print "URL : ", url
+ self.source_replica_uid = re.findall("user-([0-9a-fA-F]+)", url)[0]
+ print "uid -->", self.source_replica_uid
+
self._sync_db = None
if sync_db_path is not None:
self._init_sync_db(sync_db_path)
+ # whether to bypass the received messages decryption deferral
+ self._decrypt_inline = False
+
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto, self._sync_db,
+ insert_doc_cb=self._insert_doc_cb)
+ self._sync_watcher = TimerTask(
+ self._decrypt_syncing_received_docs, delay=10)
+ self._sync_watcher.start()
+
+ def set_decrypt_inline(self, value):
+ self._decrypt_inline = value
+
+ @property
+ def decrypt_inline(self):
+ return self._decrypt_inline
+
@staticmethod
def connect(url, crypto=None):
return SoledadSyncTarget(url, crypto=crypto)
@@ -151,31 +184,57 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
raise BrokenSyncStream
data = parts[1:-1]
comma = False
+
+ queue_for_decrypt = (not self.decrypt_inline or
+ self._sync_db is None)
+ if queue_for_decrypt:
+ self._sync_decr_pool.write_encrypted_lock.acquire()
if data:
line, comma = utils.check_and_strip_comma(data[0])
res = json.loads(line)
if ensure_callback and 'replica_uid' in res:
ensure_callback(res['replica_uid'])
+
+ # XXX check that writing_incoming lock is not acquired ---------
+
for entry in data[1:]:
if not comma: # missing in between comma
raise BrokenSyncStream
line, comma = utils.check_and_strip_comma(entry)
entry = json.loads(line)
+ gen, trans_id = entry['gen'], entry['trans_id']
#-------------------------------------------------------------
# symmetric decryption of document's contents
#-------------------------------------------------------------
- # if arriving content was symmetrically encrypted, we decrypt
- # it.
+ # If arriving content was symmetrically encrypted, we decrypt
+ # it. We do it inline if decrypt_inline flag is True or no
+ # sync_db was defined, otherwise we defer it writing it to the
+ # received docs table.
+
doc = SoledadDocument(
entry['id'], entry['rev'], entry['content'])
- if doc.content and ENC_SCHEME_KEY in doc.content:
- if doc.content[ENC_SCHEME_KEY] == \
- EncryptionSchemes.SYMKEY:
- doc.set_json(decrypt_doc(self._crypto, doc))
+
+ if is_symmetrically_encrypted(doc):
+ if queue_for_decrypt:
+ print "ENQUEUING DECRYPT -----------------------"
+ self._save_encrypted_received_doc(doc, gen, trans_id)
+ else:
+ print "INLINE DECRYPT -------------------------"
+ # force inline decrypt, or no-db fallback, for tests
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+ doc.set_json(decrypt_doc_dict(
+ doc.content, doc.doc_id, doc.rev,
+ key, secret))
+ # XXX should release lock in the decrypt pool
+
#-------------------------------------------------------------
# end of symmetric decryption
#-------------------------------------------------------------
- return_doc_cb(doc, entry['gen'], entry['trans_id'])
+ if not queue_for_decrypt:
+ return_doc_cb(doc, gen, trans_id)
+ if queue_for_decrypt:
+ self._sync_decr_pool.write_encrypted_lock.release()
if parts[-1] != ']':
try:
partdic = json.loads(parts[-1])
@@ -304,6 +363,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self.source_replica_uid = source_replica_uid
+ print "SETTING SOURCE REPLICA UID to", source_replica_uid
+ # let the decrypter pool access the passed callback to insert docs
+ print "SETTING PROXY TO ------------>", return_doc_cb
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ if not self.clear_to_sync():
+ raise PendingReceivedDocsSyncError
+
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -340,9 +409,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
if not doc.is_tombstone():
if self._sync_db is None:
# fallback case, for tests
+ key = self._crypto.doc_passphrase(doc.doc_id)
+ secret = self._crypto.secret
+
doc_json = encrypt_docstr(
json.dumps(doc.get_json()),
- doc.doc_id, doc.rev, self._crypto.secret)
+ doc.doc_id, doc.rev, key, secret)
else:
try:
doc_json = self.get_encrypted_doc_from_db(
@@ -378,6 +450,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self.delete_encrypted_docs_from_db(synced)
data = None
+ print "SYNC EXCHANGE FINISHED: new generation -> %s" % res['new_generation']
return res['new_generation'], res['new_transaction_id']
#
@@ -439,9 +512,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:param doc_rev: The document revision
:type doc_rev: str
"""
+ encr = SyncEncrypterPool
c = self._sync_db.cursor()
- # XXX interpolate table name
- sql = ("SELECT content FROM docs_tosync WHERE doc_id=? and rev=?")
+ sql = ("SELECT content FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
c.execute(sql, (doc_id, doc_rev))
res = c.fetchall()
if len(res) != 0:
@@ -456,10 +530,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
to be deleted.
:type docs_ids: any iterable of tuples of str
"""
+ encr = SyncEncrypterPool
c = self._sync_db.cursor()
for doc_id, doc_rev in docs_ids:
- # XXX interpolate table name
- sql = ("DELETE FROM docs_tosync "
- "WHERE doc_id=? and rev=?")
+ sql = ("DELETE FROM %s WHERE doc_id=? and rev=?" % (
+ encr.TABLE_NAME,))
c.execute(sql, (doc_id, doc_rev))
self._sync_db.commit()
+
+ def _save_encrypted_received_doc(self, doc, gen, trans_id):
+ """
+ Save an incoming document into the received docs table in the sync db.
+
+ :param doc: The document to save.
+ :type doc: SoledadDocument
+ :param gen: The generation.
+ :type gen: str
+ :param trans_id: Transacion id.
+ :type gen: str
+ """
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id)
+
+ #
+ # Symmetric decryption of syncing docs
+ #
+
+ def clear_to_sync(self):
+ """
+ Return True if sync can proceed (ie, the received db table is empty).
+ :rtype: bool
+ """
+ return self._sync_decr_pool.count_received_encrypted_docs() == 0
+
+ def _decrypt_syncing_received_docs(self):
+ """
+ Decrypt the documents received from remote replica and insert them
+ into the local one.
+
+ Called periodically from TimerTask self._sync_watcher.
+ """
+ if sameProxiedObjects(self._insert_doc_cb.get(self.source_replica_uid),
+ None):
+ logger.warning("No insert_doc callback, skipping decryption.")
+ return
+
+ decrypter = self._sync_decr_pool
+ decrypter.decrypt_received_docs(self.source_replica_uid)
+ decrypter.process_decrypted()