summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-05-19 18:46:53 -0300
committerdrebs <drebs@leap.se>2015-05-20 10:16:46 -0300
commite62dafeba8f08c1f7588e37cf9cd3fb28e79a020 (patch)
tree2f6647e1a3b273aa5579941dbcf3e401c90514b5
parent94cbe24f6c6cd54e14d8d1b14e617c2d52c427fd (diff)
[feature] use twisted.web.client in client sync
This change uses twisted deferreds for the whole syncing process and paves the way to implementing other transport schemes. It removes a lot of threaded code that used locks and was very difficult to maintain, and lets twisted to the dirty work. Furthermore, all blocking network i/o is now handled asynchronously by the twisted. This commit removes the possibility of interrupting a sync, and we should reimplement it using cancellable deferreds if we need it.
-rw-r--r--client/changes/feature_use-twisted-web-for-client-sync1
-rw-r--r--client/src/leap/soledad/client/encdecpool.py11
-rw-r--r--client/src/leap/soledad/client/http_target.py570
-rw-r--r--client/src/leap/soledad/client/sqlcipher.py46
-rw-r--r--client/src/leap/soledad/client/sync.py83
-rw-r--r--client/src/leap/soledad/client/target.py1473
6 files changed, 635 insertions, 1549 deletions
diff --git a/client/changes/feature_use-twisted-web-for-client-sync b/client/changes/feature_use-twisted-web-for-client-sync
new file mode 100644
index 00000000..b4d1d4a4
--- /dev/null
+++ b/client/changes/feature_use-twisted-web-for-client-sync
@@ -0,0 +1 @@
+ o Use twisted.web.client for client sync.
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py
index 0c1f92ea..7c21c30e 100644
--- a/client/src/leap/soledad/client/encdecpool.py
+++ b/client/src/leap/soledad/client/encdecpool.py
@@ -789,12 +789,5 @@ class SyncDecrypterPool(SyncEncryptDecryptPool):
self._exception = e
self._finished.set()
- def wait(self):
- """
- Wait for the decrypt-and-process loop to finish.
- """
- logger.debug("Waiting for asynchronous decryption of incoming documents...")
- self._finished.wait()
- logger.debug("Asynchronous decryption of incoming documents finished.")
- if self._exception:
- raise self._exception
+ def has_finished(self):
+ return self._finished.is_set()
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py
new file mode 100644
index 00000000..041180e6
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target.py
@@ -0,0 +1,570 @@
+# -*- coding: utf-8 -*-
+# target.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+A U1DB backend for encrypting data before sending to server and decrypting
+after receiving.
+"""
+
+import json
+import base64
+import logging
+
+from zope.proxy import setProxiedObject
+from zope.proxy import ProxyBase
+from uuid import uuid4
+from functools import partial
+from collections import defaultdict
+
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.web.client import getPage
+
+from u1db import errors
+from u1db import SyncTarget
+from u1db.remote import utils
+
+from leap.soledad.common.document import SoledadDocument
+
+from leap.soledad.client.crypto import is_symmetrically_encrypted
+from leap.soledad.client.crypto import encrypt_doc
+from leap.soledad.client.crypto import decrypt_doc
+from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
+from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
+from leap.soledad.client.events import signal
+from leap.soledad.client.encdecpool import SyncDecrypterPool
+
+
+logger = logging.getLogger(__name__)
+
+
+class SoledadHTTPSyncTarget(SyncTarget):
+ """
+ A SyncTarget that encrypts data before sending and decrypts data after
+ receiving.
+
+ Normally encryption will have been written to the sync database upon
+ document modification. The sync database is also used to write temporarily
+ the parsed documents that the remote send us, before being decrypted and
+ written to the main database.
+ """
+
+ # will later keep a reference to the insert-doc callback
+ # passed to sync_exchange
+ _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
+
+ def __init__(self, url, source_replica_uid, creds, crypto,
+ sync_db=None, sync_enc_pool=None):
+ """
+ Initialize the sync target.
+
+ :param url: The server sync url.
+ :type url: str
+ :param source_replica_uid: The source replica uid which we use when
+ deferring decryption.
+ :type source_replica_uid: str
+ :param url: The url of the target replica to sync with.
+ :type url: str
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: creds
+ :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
+ document contents when syncing.
+ :type crypto: soledad.crypto.SoledadCrypto
+ :param sync_db: Optional. handler for the db with the symmetric
+ encryption of the syncing documents. If
+ None, encryption will be done in-place,
+ instead of retreiving it from the dedicated
+ database.
+ :type sync_db: Sqlite handler
+ """
+ if url.endswith("/"):
+ url = url[:-1]
+ self._url = str(url) + "/sync-from/" + source_replica_uid
+ self.source_replica_uid = source_replica_uid
+ self._auth_header = None
+ self.set_creds(creds)
+ self._crypto = crypto
+ self._sync_db = sync_db
+ self._sync_enc_pool = sync_enc_pool
+ # asynchronous encryption/decryption attributes
+ self._decryption_callback = None
+ self._sync_decr_pool = None
+
+ def set_creds(self, creds):
+ """
+ Update credentials.
+
+ :param creds: A dictionary containing the uuid and token.
+ :type creds: dict
+ """
+ uuid = creds['token']['uuid']
+ token = creds['token']['token']
+ auth = '%s:%s' % (uuid, token)
+ b64_token = base64.b64encode(auth)
+ self._auth_header = {'Authorization': 'Token %s' % b64_token}
+
+ @property
+ def _defer_encryption(self):
+ return self._sync_enc_pool is not None
+
+ #
+ # SyncTarget API
+ #
+
+ @defer.inlineCallbacks
+ def get_sync_info(self, source_replica_uid):
+ """
+ Return information about known state of remote database.
+
+ Return the replica_uid and the current database generation of the
+ remote database, and its last-seen database generation for the client
+ replica.
+
+ :param source_replica_uid: The client-size replica uid.
+ :type source_replica_uid: str
+
+ :return: A deferred which fires with (target_replica_uid,
+ target_replica_generation, target_trans_id,
+ source_replica_last_known_generation,
+ source_replica_last_known_transaction_id)
+ :rtype: twisted.internet.defer.Deferred
+ """
+ raw = yield getPage(self._url, headers=self._auth_header)
+ res = json.loads(raw)
+ defer.returnValue([
+ res['target_replica_uid'],
+ res['target_replica_generation'],
+ res['target_replica_transaction_id'],
+ res['source_replica_generation'],
+ res['source_transaction_id']
+ ])
+
+ def record_sync_info(
+ self, source_replica_uid, source_replica_generation,
+ source_replica_transaction_id):
+ """
+ Record tip information for another replica.
+
+ After sync_exchange has been processed, the caller will have
+ received new content from this replica. This call allows the
+ source replica instigating the sync to inform us what their
+ generation became after applying the documents we returned.
+
+ This is used to allow future sync operations to not need to repeat data
+ that we just talked about. It also means that if this is called at the
+ wrong time, there can be database records that will never be
+ synchronized.
+
+ :param source_replica_uid: The identifier for the source replica.
+ :type source_replica_uid: str
+ :param source_replica_generation: The database generation for the
+ source replica.
+ :type source_replica_generation: int
+ :param source_replica_transaction_id: The transaction id associated
+ with the source replica
+ generation.
+ :type source_replica_transaction_id: str
+
+ :return: A deferred which fires with the result of the query.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ data = json.dumps({
+ 'generation': source_replica_generation,
+ 'transaction_id': source_replica_transaction_id
+ })
+ headers = self._auth_header.copy()
+ headers.update({'content-type': 'application/json'})
+ return getPage(
+ self._url,
+ method='PUT',
+ headers=headers,
+ postdata=data)
+
+ @defer.inlineCallbacks
+ def sync_exchange(self, docs_by_generation, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback=None,
+ defer_decryption=True, sync_id=None):
+ """
+ Find out which documents the remote database does not know about,
+ encrypt and send them. After that, receive documents from the remote
+ database.
+
+ :param docs_by_generations: A list of (doc_id, generation, trans_id)
+ of local documents that were changed since
+ the last local generation the remote
+ replica knows about.
+ :type docs_by_generations: list of tuples
+
+ :param source_replica_uid: The uid of the source replica.
+ :type source_replica_uid: str
+
+ :param last_known_generation: Target's last known generation.
+ :type last_known_generation: int
+
+ :param last_known_trans_id: Target's last known transaction id.
+ :type last_known_trans_id: str
+
+ :param return_doc_cb: A callback for inserting received documents from
+ target. If not overriden, this will call u1db
+ insert_doc_from_target in synchronizer, which
+ implements the TAKE OTHER semantics.
+ :type return_doc_cb: function
+
+ :param ensure_callback: A callback that ensures we know the target
+ replica uid if the target replica was just
+ created.
+ :type ensure_callback: function
+
+ :param defer_decryption: Whether to defer the decryption process using
+ the intermediate database. If False,
+ decryption will be done inline.
+ :type defer_decryption: bool
+
+ :return: A deferred which fires with the new generation and
+ transaction id of the target replica.
+ :rtype: twisted.internet.defer.Deferred
+ """
+
+ self._ensure_callback = ensure_callback
+
+ if sync_id is None:
+ sync_id = str(uuid4())
+ self.source_replica_uid = source_replica_uid
+
+ # let the decrypter pool access the passed callback to insert docs
+ setProxiedObject(self._insert_doc_cb[source_replica_uid],
+ return_doc_cb)
+
+ gen_after_send, trans_id_after_send = yield self._send_docs(
+ docs_by_generation,
+ last_known_generation,
+ last_known_trans_id,
+ sync_id)
+
+ cur_target_gen, cur_target_trans_id = yield self._receive_docs(
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback, sync_id,
+ defer_decryption=defer_decryption)
+
+ # update gen and trans id info in case we just sent and did not
+ # receive docs.
+ if gen_after_send is not None and gen_after_send > cur_target_gen:
+ cur_target_gen = gen_after_send
+ cur_target_trans_id = trans_id_after_send
+
+ defer.returnValue([cur_target_gen, cur_target_trans_id])
+
+ #
+ # methods to send docs
+ #
+
+ def _prepare(self, comma, entries, **dic):
+ entry = comma + '\r\n' + json.dumps(dic)
+ entries.append(entry)
+ return len(entry)
+
+ @defer.inlineCallbacks
+ def _send_docs(self, docs_by_generation, last_known_generation,
+ last_known_trans_id, sync_id):
+
+ if not docs_by_generation:
+ defer.returnValue([None, None])
+
+ headers = self._auth_header.copy()
+ headers.update({'content-type': 'application/x-soledad-sync-put'})
+ # add remote replica metadata to the request
+ first_entries = ['[']
+ self._prepare(
+ '', first_entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ idx = 0
+ total = len(docs_by_generation)
+ for doc, gen, trans_id in docs_by_generation:
+ idx += 1
+ result = yield self._send_one_doc(
+ headers, first_entries, doc,
+ gen, trans_id, total, idx)
+ if self._defer_encryption:
+ self._sync_enc_pool.delete_encrypted_doc(
+ doc.doc_id, doc.rev)
+ signal(SOLEDAD_SYNC_SEND_STATUS,
+ "Soledad sync send status: %d/%d"
+ % (idx, total))
+ response_dict = json.loads(result)[0]
+ gen_after_send = response_dict['new_generation']
+ trans_id_after_send = response_dict['new_transaction_id']
+ defer.returnValue([gen_after_send, trans_id_after_send])
+
+ @defer.inlineCallbacks
+ def _send_one_doc(self, headers, first_entries, doc, gen, trans_id,
+ number_of_docs, doc_idx):
+ entries = first_entries[:]
+ # add the document to the request
+ content = yield self._encrypt_doc(doc)
+ self._prepare(
+ ',', entries,
+ id=doc.doc_id, rev=doc.rev, content=content, gen=gen,
+ trans_id=trans_id, number_of_docs=number_of_docs,
+ doc_idx=doc_idx)
+ entries.append('\r\n]')
+ data = ''.join(entries)
+ result = yield getPage(
+ self._url,
+ method='POST',
+ headers=headers,
+ postdata=data)
+ defer.returnValue(result)
+
+ def _encrypt_doc(self, doc):
+ d = None
+ if doc.is_tombstone():
+ d = defer.succeed(None)
+ elif not self._defer_encryption:
+ # fallback case, for tests
+ d = defer.succeed(encrypt_doc(self._crypto, doc))
+ else:
+
+ def _maybe_encrypt_doc_inline(doc_json):
+ if doc_json is None:
+ # the document is not marked as tombstone, but we got
+ # nothing from the sync db. As it is not encrypted
+ # yet, we force inline encryption.
+ return encrypt_doc(self._crypto, doc)
+ return doc_json
+
+ d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev)
+ d.addCallback(_maybe_encrypt_doc_inline)
+ return d
+
+ #
+ # methods to receive doc
+ #
+
+ @defer.inlineCallbacks
+ def _receive_docs(self, last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback, sync_id,
+ defer_decryption):
+ # we keep a reference to the callback in case we defer the decryption
+ self._return_doc_cb = return_doc_cb
+ self._queue_for_decrypt = defer_decryption \
+ and self._sync_db is not None
+
+ new_generation = last_known_generation
+ new_transaction_id = last_known_trans_id
+
+ if self._queue_for_decrypt:
+ logger.debug(
+ "Soledad sync: will queue received docs for decrypting.")
+
+ if defer_decryption:
+ self._setup_sync_decr_pool()
+
+ headers = self._auth_header.copy()
+ headers.update({'content-type': 'application/x-soledad-sync-get'})
+
+ # maybe get one doc
+ d = self._receive_one_doc(
+ headers, last_known_generation, last_known_trans_id,
+ sync_id, 0)
+ d.addCallback(partial(self._insert_received_doc, 1, 1))
+ number_of_changes, ngen, ntrans = yield d
+
+ if defer_decryption:
+ self._sync_decr_pool.set_docs_to_process(
+ number_of_changes)
+ idx = 1
+
+ # maybe get more documents
+ deferreds = []
+ while idx < number_of_changes:
+ d = self._receive_one_doc(
+ headers, last_known_generation,
+ last_known_trans_id, sync_id, idx)
+ d.addCallback(
+ partial(
+ self._insert_received_doc,
+ idx + 1,
+ number_of_changes))
+ deferreds.append(d)
+ idx += 1
+ results = yield defer.gatherResults(deferreds)
+
+ # get genration and transaction id of target after insertions
+ if deferreds:
+ _, new_generation, new_transaction_id = results.pop()
+
+ # get current target gen and trans id in case no documents were
+ def _shutdown_and_finish(res):
+ self._sync_decr_pool.close()
+ return new_generation, new_transaction_id
+
+ d = defer.Deferred()
+ d.addCallback(_shutdown_and_finish)
+
+ def _wait_or_finish():
+ if not self._sync_decr_pool.has_finished():
+ reactor.callLater(
+ SyncDecrypterPool.DECRYPT_LOOP_PERIOD,
+ _wait_or_finish)
+ else:
+ d.callback(None)
+
+ # decrypt docs in case of deferred decryption
+ if defer_decryption:
+ _wait_or_finish()
+ else:
+ d.callback(None)
+
+ new_generation, new_transaction_id = yield d
+ defer.returnValue([new_generation, new_transaction_id])
+
+ def _receive_one_doc(self, headers, last_known_generation,
+ last_known_trans_id, sync_id, received):
+ entries = ['[']
+ # add remote replica metadata to the request
+ self._prepare(
+ '', entries,
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ sync_id=sync_id,
+ ensure=self._ensure_callback is not None)
+ # inform server of how many documents have already been received
+ self._prepare(
+ ',', entries, received=received)
+ entries.append('\r\n]')
+ # send headers
+ return getPage(
+ self._url,
+ method='POST',
+ headers=headers,
+ postdata=''.join(entries))
+
+ def _insert_received_doc(self, idx, total, response):
+ """
+ Insert a received document into the local replica.
+
+ :param idx: The index count of the current operation.
+ :type idx: int
+ :param total: The total number of operations.
+ :type total: int
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+ """
+ new_generation, new_transaction_id, number_of_changes, doc_id, \
+ rev, content, gen, trans_id = \
+ self._parse_received_doc_response(response)
+ if doc_id is not None:
+ print doc_id
+ # decrypt incoming document and insert into local database
+ # -------------------------------------------------------------
+ # symmetric decryption of document's contents
+ # -------------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt it.
+ # We do it inline if defer_decryption flag is False or no sync_db
+ # was defined, otherwise we defer it writing it to the received
+ # docs table.
+ doc = SoledadDocument(doc_id, rev, content)
+ if is_symmetrically_encrypted(doc):
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_encrypted_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ # defer_decryption is False or no-sync-db fallback
+ doc.set_json(decrypt_doc(self._crypto, doc))
+ self._return_doc_cb(doc, gen, trans_id)
+ else:
+ # not symmetrically encrypted doc, insert it directly
+ # or save it in the decrypted stage.
+ if self._queue_for_decrypt:
+ self._sync_decr_pool.insert_received_doc(
+ doc.doc_id, doc.rev, doc.content, gen, trans_id,
+ idx)
+ else:
+ self._return_doc_cb(doc, gen, trans_id)
+ # -------------------------------------------------------------
+ # end of symmetric decryption
+ # -------------------------------------------------------------
+ msg = "%d/%d" % (idx, total)
+ signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
+ logger.debug("Soledad sync receive status: %s" % msg)
+ return number_of_changes, new_generation, new_transaction_id
+
+ def _parse_received_doc_response(self, response):
+ """
+ Parse the response from the server containing the received document.
+
+ :param response: The body and headers of the response.
+ :type response: tuple(str, dict)
+
+ :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev,
+ content, gen, trans_id)
+ :rtype: tuple
+ """
+ # decode incoming stream
+ parts = response.splitlines()
+ if not parts or parts[0] != '[' or parts[-1] != ']':
+ raise errors.BrokenSyncStream
+ data = parts[1:-1]
+ # decode metadata
+ line, comma = utils.check_and_strip_comma(data[0])
+ metadata = None
+ try:
+ metadata = json.loads(line)
+ new_generation = metadata['new_generation']
+ new_transaction_id = metadata['new_transaction_id']
+ number_of_changes = metadata['number_of_changes']
+ except (json.JSONDecodeError, KeyError):
+ raise errors.BrokenSyncStream
+ # make sure we have replica_uid from fresh new dbs
+ if self._ensure_callback and 'replica_uid' in metadata:
+ self._ensure_callback(metadata['replica_uid'])
+ # parse incoming document info
+ doc_id = None
+ rev = None
+ content = None
+ gen = None
+ trans_id = None
+ if number_of_changes > 0:
+ try:
+ entry = json.loads(data[1])
+ doc_id = entry['id']
+ rev = entry['rev']
+ content = entry['content']
+ gen = entry['gen']
+ trans_id = entry['trans_id']
+ except (IndexError, KeyError):
+ raise errors.BrokenSyncStream
+ return new_generation, new_transaction_id, number_of_changes, \
+ doc_id, rev, content, gen, trans_id
+
+ def _setup_sync_decr_pool(self):
+ """
+ Set up the SyncDecrypterPool for deferred decryption.
+ """
+ if self._sync_decr_pool is None and self._sync_db is not None:
+ # initialize syncing queue decryption pool
+ self._sync_decr_pool = SyncDecrypterPool(
+ self._crypto,
+ self._sync_db,
+ insert_doc_cb=self._insert_doc_cb,
+ source_replica_uid=self.source_replica_uid)
diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py
index 16241621..53afbda8 100644
--- a/client/src/leap/soledad/client/sqlcipher.py
+++ b/client/src/leap/soledad/client/sqlcipher.py
@@ -53,20 +53,17 @@ from u1db.backends import sqlite_backend
from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
-from httplib import CannotSendRequest
from functools import partial
from pysqlcipher import dbapi2 as sqlcipher_dbapi2
from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
-from twisted.python import log
from twisted.enterprise import adbapi
from leap.soledad.client import encdecpool
-from leap.soledad.client.target import SoledadSyncTarget
+from leap.soledad.client.http_target import SoledadHTTPSyncTarget
from leap.soledad.client.sync import SoledadSynchronizer
from leap.soledad.client import pragmas
@@ -590,33 +587,13 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
before the synchronisation was performed.
:rtype: Deferred
"""
- kwargs = {'creds': creds, 'autocreate': autocreate,
- 'defer_decryption': defer_decryption}
- return self._defer_to_sync_threadpool(self._sync, url, **kwargs)
-
- def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
- res = None
-
# the following context manager blocks until the syncing lock can be
# acquired.
- # TODO review, I think this is no longer needed with a 1-thread
- # threadpool.
-
- log.msg("in _sync")
- self.__url = url
with self._syncer(url, creds=creds) as syncer:
# XXX could mark the critical section here...
- try:
- log.msg('syncer sync...')
- res = syncer.sync(autocreate=autocreate,
- defer_decryption=defer_decryption)
- except CannotSendRequest:
- logger.warning("Connection with sync target couldn't be "
- "established. Resetting connection...")
- # closing the connection it will be recreated in the next try
- syncer.sync_target.close()
- return
- return res
+ return syncer.sync(
+ autocreate=autocreate,
+ defer_decryption=defer_decryption)
def stop_sync(self):
"""
@@ -673,13 +650,14 @@ class SQLCipherU1DBSync(SQLCipherDatabase):
if syncer is None or h != cur_h:
syncer = SoledadSynchronizer(
self,
- SoledadSyncTarget(url,
- # XXX is the replica_uid ready?
- self._replica_uid,
- creds=creds,
- crypto=self._crypto,
- sync_db=self._sync_db,
- sync_enc_pool=self._sync_enc_pool))
+ SoledadHTTPSyncTarget(
+ url,
+ # XXX is the replica_uid ready?
+ self._replica_uid,
+ creds=creds,
+ crypto=self._crypto,
+ sync_db=self._sync_db,
+ sync_enc_pool=self._sync_enc_pool))
self._syncers[url] = (h, syncer)
# in order to reuse the same synchronizer multiple times we have to
# reset its state (i.e. the number of documents received from target
diff --git a/client/src/leap/soledad/client/sync.py b/client/src/leap/soledad/client/sync.py
index d4ca4258..f8f74ce7 100644
--- a/client/src/leap/soledad/client/sync.py
+++ b/client/src/leap/soledad/client/sync.py
@@ -25,9 +25,10 @@ Extend u1db Synchronizer with the ability to:
* Be interrupted and recovered.
"""
import logging
-import traceback
from threading import Lock
+from twisted.internet import defer
+
from u1db import errors
from u1db.sync import Synchronizer
@@ -90,6 +91,7 @@ class SoledadSynchronizer(Synchronizer):
# Synchronizer may be reused later.
self.release_syncing_lock()
+ @defer.inlineCallbacks
def _sync(self, autocreate=False, defer_decryption=True):
"""
Helper function, called from the main `sync` method.
@@ -102,7 +104,7 @@ class SoledadSynchronizer(Synchronizer):
ensure_callback = None
try:
(self.target_replica_uid, target_gen, target_trans_id,
- target_my_gen, target_my_trans_id) = \
+ target_my_gen, target_my_trans_id) = yield \
sync_target.get_sync_info(self.source._replica_uid)
except errors.DatabaseDoesNotExist:
if not autocreate:
@@ -151,15 +153,15 @@ class SoledadSynchronizer(Synchronizer):
self.target_replica_uid)
logger.debug(
"Soledad source sync info:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
+ " last target gen known to source: %d\n"
+ " last target trans_id known to source: %s"
% (target_last_known_gen, target_last_known_trans_id))
# validate transaction ids
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
- return my_gen
+ defer.returnValue(my_gen)
# prepare to send all the changed docs
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
@@ -177,33 +179,26 @@ class SoledadSynchronizer(Synchronizer):
#
# The sync_exchange method may be interrupted, in which case it will
# return a tuple of Nones.
- try:
- new_gen, new_trans_id = sync_target.sync_exchange(
- docs_by_generation, self.source._replica_uid,
- target_last_known_gen, target_last_known_trans_id,
- self._insert_doc_from_target, ensure_callback=ensure_callback,
- defer_decryption=defer_decryption)
- logger.debug(
- "Soledad source sync info after sync exchange:\n"
- " source target gen: %d\n"
- " source target trans_id: %s"
- % (new_gen, new_trans_id))
- info = {
- "target_replica_uid": self.target_replica_uid,
- "new_gen": new_gen,
- "new_trans_id": new_trans_id,
- "my_gen": my_gen
- }
- self._syncing_info = info
- self.complete_sync()
- except Exception as e:
- logger.error("Soledad sync error: %s" % str(e))
- logger.error(traceback.format_exc())
- sync_target.stop()
- finally:
- sync_target.close()
-
- return my_gen
+ new_gen, new_trans_id = yield sync_target.sync_exchange(
+ docs_by_generation, self.source._replica_uid,
+ target_last_known_gen, target_last_known_trans_id,
+ self._insert_doc_from_target, ensure_callback=ensure_callback,
+ defer_decryption=defer_decryption)
+ logger.debug(
+ "Soledad source sync info after sync exchange:\n"
+ " source known target gen: %d\n"
+ " source known target trans_id: %s"
+ % (new_gen, new_trans_id))
+ info = {
+ "target_replica_uid": self.target_replica_uid,
+ "new_gen": new_gen,
+ "new_trans_id": new_trans_id,
+ "my_gen": my_gen
+ }
+ self._syncing_info = info
+ yield self.complete_sync()
+
+ defer.returnValue(my_gen)
def complete_sync(self):
"""
@@ -211,6 +206,9 @@ class SoledadSynchronizer(Synchronizer):
(a) record last known generation and transaction uid for the remote
replica, and
(b) make target aware of our current reached generation.
+
+ :return: A deferred which will fire when the sync has been completed.
+ :rtype: twisted.internet.defer.Deferred
"""
logger.debug("Completing deferred last step in SYNC...")
@@ -221,7 +219,26 @@ class SoledadSynchronizer(Synchronizer):
info["target_replica_uid"], info["new_gen"], info["new_trans_id"])
# if gapless record current reached generation with target
- self._record_sync_info_with_the_target(info["my_gen"])
+ return self._record_sync_info_with_the_target(info["my_gen"])
+
+ def _record_sync_info_with_the_target(self, start_generation):
+ """
+ Store local replica metadata in server.
+
+ :param start_generation: The local generation when the sync was
+ started.
+ :type start_generation: int
+
+ :return: A deferred which will fire when the operation has been
+ completed.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ cur_gen, trans_id = self.source._get_generation_info()
+ if (cur_gen == start_generation + self.num_inserted
+ and self.num_inserted > 0):
+ return self.sync_target.record_sync_info(
+ self.source._replica_uid, cur_gen, trans_id)
+ return defer.succeed(None)
@property
def syncing(self):
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
deleted file mode 100644
index 667aab15..00000000
--- a/client/src/leap/soledad/client/target.py
+++ /dev/null
@@ -1,1473 +0,0 @@
-# -*- coding: utf-8 -*-
-# target.py
-# Copyright (C) 2013, 2014 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A U1DB backend for encrypting data before sending to server and decrypting
-after receiving.
-"""
-import cStringIO
-import gzip
-import logging
-import re
-import urllib
-import threading
-
-from collections import defaultdict
-from time import sleep
-from uuid import uuid4
-from functools import partial
-
-import simplejson as json
-
-from u1db import errors
-from u1db.remote import utils, http_errors
-from u1db.remote.http_target import HTTPSyncTarget
-from u1db.remote.http_client import _encode_query_parameter, HTTPClientBase
-from zope.proxy import ProxyBase
-from zope.proxy import setProxiedObject
-
-from twisted.internet import defer
-
-from leap.soledad.common.document import SoledadDocument
-from leap.soledad.client.auth import TokenBasedAuth
-from leap.soledad.client.crypto import is_symmetrically_encrypted
-from leap.soledad.client.crypto import encrypt_doc, decrypt_doc
-from leap.soledad.client.encdecpool import SyncDecrypterPool
-from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS
-from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
-from leap.soledad.client.events import signal
-
-
-logger = logging.getLogger(__name__)
-
-
-def _gunzip(data):
- """
- Uncompress data that is gzipped.
-
- :param data: gzipped data
- :type data: basestring
- """
- buffer = cStringIO.StringIO()
- buffer.write(data)
- buffer.seek(0)
- try:
- data = gzip.GzipFile(mode='r', fileobj=buffer).read()
- except Exception:
- logger.warning("Error while decrypting gzipped data")
- buffer.close()
- return data
-
-
-class DocumentSyncerThread(threading.Thread):
- """
- A thread that knowns how to either send or receive a document during the
- sync process.
- """
-
- def __init__(self, doc_syncer, release_method, failed_method,
- idx, total, last_request_lock=None, last_callback_lock=None):
- """
- Initialize a new syncer thread.
-
- :param doc_syncer: A document syncer.
- :type doc_syncer: HTTPDocumentSyncer
- :param release_method: A method to be called when finished running.
- :type release_method: callable(DocumentSyncerThread)
- :param failed_method: A method to be called when we failed.
- :type failed_method: callable(DocumentSyncerThread)
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- threading.Thread.__init__(self)
- self._doc_syncer = doc_syncer
- self._release_method = release_method
- self._failed_method = failed_method
- self._idx = idx
- self._total = total
- self._last_request_lock = last_request_lock
- self._last_callback_lock = last_callback_lock
- self._response = None
- self._exception = None
- self._result = None
- self._success = False
- self.started = threading.Event()
- # a lock so we can signal when we're finished
- self._request_lock = threading.Lock()
- self._request_lock.acquire()
- self._callback_lock = threading.Lock()
- self._callback_lock.acquire()
- # make thread interruptable
- self._stopped = None
- self._stop_lock = threading.Lock()
-
- def run(self):
- """
- Run the HTTP request and store results.
-
- This method will block and wait for an eventual previous operation to
- finish before actually performing the request. It also traps any
- exception and register any failure with the request.
- """
- self.started.set()
-
- with self._stop_lock:
- if self._stopped is None:
- self._stopped = False
- else:
- return
-
- # eventually wait for the previous thread to finish
- if self._last_request_lock is not None:
- self._last_request_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- try:
- self._response = self._doc_syncer.do_request()
- self._request_lock.release()
-
- # run success callback
- if self._doc_syncer.success_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self._stopped is True:
- return
-
- self._result = self._doc_syncer.success_callback(
- self._idx, self._total, self._response)
- self._success = True
- doc_syncer = self._doc_syncer
- self._release_method(self, doc_syncer)
- self._doc_syncer = None
- # let next thread executed its callback
- self._callback_lock.release()
-
- # trap any exception and signal failure
- except Exception as e:
- self._exception = e
- self._success = False
- # run failure callback
- if self._doc_syncer.failure_callback is not None:
-
- # eventually wait for callback lock release
- if self._last_callback_lock is not None:
- self._last_callback_lock.acquire()
-
- # bail out in case we've been interrupted
- if self.stopped is True:
- return
-
- self._doc_syncer.failure_callback(
- self._idx, self._total, self._exception)
-
- self._failed_method()
- # we do not release the callback lock here because we
- # failed and so we don't want other threads to succeed.
-
- @property
- def doc_syncer(self):
- return self._doc_syncer
-
- @property
- def response(self):
- return self._response
-
- @property
- def exception(self):
- return self._exception
-
- @property
- def callback_lock(self):
- return self._callback_lock
-
- @property
- def request_lock(self):
- return self._request_lock
-
- @property
- def success(self):
- return self._success
-
- def stop(self):
- with self._stop_lock:
- self._stopped = True
-
- @property
- def stopped(self):
- with self._stop_lock:
- return self._stopped
-
- @property
- def result(self):
- return self._result
-
-
-class DocumentSyncerPool(object):
- """
- A pool of reusable document syncers.
- """
-
- POOL_SIZE = 10
- """
- The maximum amount of syncer threads running at the same time.
- """
-
- def __init__(self, raw_url, raw_creds, query_string, headers,
- ensure_callback, stop_method):
- """
- Initialize the document syncer pool.
-
- :param raw_url: The complete raw URL for the HTTP request.
- :type raw_url: str
- :param raw_creds: The credentials for the HTTP request.
- :type raw_creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
-
- """
- # save syncer params
- self._raw_url = raw_url
- self._raw_creds = raw_creds
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- self._stop_method = stop_method
- # pool attributes
- self._failures = False
- self._semaphore_pool = threading.BoundedSemaphore(
- DocumentSyncerPool.POOL_SIZE)
- self._pool_access_lock = threading.Lock()
- self._doc_syncers = []
- self._threads = []
-
- def new_syncer_thread(self, idx, total, last_request_lock=None,
- last_callback_lock=None):
- """
- Yield a new document syncer thread.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param last_request_lock: A lock to wait for before actually performing
- the request.
- :type last_request_lock: threading.Lock
- :param last_callback_lock: A lock to wait for before actually running
- the success callback.
- :type last_callback_lock: threading.Lock
- """
- t = None
- # wait for available threads
- self._semaphore_pool.acquire()
- with self._pool_access_lock:
- if self._failures is True:
- return None
- # get a syncer
- doc_syncer = self._get_syncer()
- # we rely on DocumentSyncerThread.run() to release the lock using
- # self.release_syncer so we can launch a new thread.
- t = DocumentSyncerThread(
- doc_syncer, self.release_syncer, self.stop_threads,
- idx, total,
- last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
- self._threads.append(t)
- return t
-
- def _failed(self):
- with self._pool_access_lock:
- self._failures = True
-
- @property
- def failures(self):
- return self._failures
-
- def _get_syncer(self):
- """
- Get a document syncer from the pool.
-
- This method will create a new syncer whenever there is no syncer
- available in the pool.
-
- :return: A syncer.
- :rtype: HTTPDocumentSyncer
- """
- syncer = None
- # get an available syncer or create a new one
- try:
- syncer = self._doc_syncers.pop()
- except IndexError:
- syncer = HTTPDocumentSyncer(
- self._raw_url, self._raw_creds, self._query_string,
- self._headers, self._ensure_callback)
- return syncer
-
- def release_syncer(self, syncer_thread, doc_syncer):
- """
- Return a syncer to the pool after use and check for any failures.
-
- :param syncer: The syncer to be returned to the pool.
- :type syncer: HTTPDocumentSyncer
- """
- with self._pool_access_lock:
- self._doc_syncers.append(doc_syncer)
- if syncer_thread.success is True:
- self._threads.remove(syncer_thread)
- self._semaphore_pool.release()
-
- def stop_threads(self, fail=True):
- """
- Stop all threads in the pool.
-
- :param fail: Whether we are stopping because of a failure.
- :type fail: bool
- """
- # stop sync
- self._stop_method()
- stopped = []
- # stop all threads
- with self._pool_access_lock:
- if fail:
- self._failures = True
- logger.error("sync failed: cancelling sync threads...")
- while self._threads:
- t = self._threads.pop(0)
- t.stop()
- self._doc_syncers.append(t.doc_syncer)
- stopped.append(t)
- # release locks and join
- while stopped:
- t = stopped.pop(0)
- t.request_lock.acquire(False) # just in case
- t.request_lock.release()
- t.callback_lock.acquire(False) # just in case
- t.callback_lock.release()
- # release any blocking semaphores
- for i in xrange(DocumentSyncerPool.POOL_SIZE):
- try:
- self._semaphore_pool.release()
- except ValueError:
- break
- if fail:
- logger.error("Soledad sync: cancelled sync threads.")
-
- def cleanup(self):
- """
- Close and remove any syncers from the pool.
- """
- with self._pool_access_lock:
- while self._doc_syncers:
- syncer = self._doc_syncers.pop()
- syncer.close()
- del syncer
-
-
-class HTTPDocumentSyncer(HTTPClientBase, TokenBasedAuth):
-
- def __init__(self, raw_url, creds, query_string, headers, ensure_callback):
- """
- Initialize the client.
-
- :param raw_url: The raw URL of the target HTTP server.
- :type raw_url: str
- :param creds: Authentication credentials.
- :type creds: dict
- :param query_string: The query string for the HTTP request.
- :type query_string: str
- :param headers: The headers for the HTTP request.
- :type headers: dict
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- """
- HTTPClientBase.__init__(self, raw_url, creds=creds)
- # info needed to perform the request
- self._query_string = query_string
- self._headers = headers
- self._ensure_callback = ensure_callback
- # the actual request method
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
-
- def _reset(self):
- """
- Reset this document syncer so we can reuse it.
- """
- self._request_method = None
- self._success_callback = None
- self._failure_callback = None
- self._request_method = None
-
- def set_request_method(self, method, *args, **kwargs):
- """
- Set the actual method to perform the request.
-
- :param method: Either 'get' or 'put'.
- :type method: str
- :param args: Arguments for the request method.
- :type args: list
- :param kwargs: Keyworded arguments for the request method.
- :type kwargs: dict
- """
- self._reset()
- # resolve request method
- if method is 'get':
- self._request_method = self._get_doc
- elif method is 'put':
- self._request_method = self._put_doc
- else:
- raise Exception
- # store request method args
- self._args = args
- self._kwargs = kwargs
-
- def set_success_callback(self, callback):
- self._success_callback = callback
-
- def set_failure_callback(self, callback):
- self._failure_callback = callback
-
- @property
- def success_callback(self):
- return self._success_callback
-
- @property
- def failure_callback(self):
- return self._failure_callback
-
- def do_request(self):
- """
- Actually perform the request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- self._ensure_connection()
- args = self._args
- kwargs = self._kwargs
- return self._request_method(*args, **kwargs)
-
- def _request(self, method, url_parts, params=None, body=None,
- content_type=None):
- """
- Perform an HTTP request.
-
- :param method: The HTTP request method.
- :type method: str
- :param url_parts: A list representing the request path.
- :type url_parts: list
- :param params: Parameters for the URL query string.
- :type params: dict
- :param body: The body of the request.
- :type body: str
- :param content-type: The content-type of the request.
- :type content-type: str
-
- :return: The body and headers of the response.
- :rtype: tuple
-
- :raise errors.Unavailable: Raised after a number of unsuccesful
- request attempts.
- :raise Exception: Raised for any other exception ocurring during the
- request.
- """
-
- self._ensure_connection()
- unquoted_url = url_query = self._url.path
- if url_parts:
- if not url_query.endswith('/'):
- url_query += '/'
- unquoted_url = url_query
- url_query += '/'.join(urllib.quote(part, safe='')
- for part in url_parts)
- # oauth performs its own quoting
- unquoted_url += '/'.join(url_parts)
- encoded_params = {}
- if params:
- for key, value in params.items():
- key = unicode(key).encode('utf-8')
- encoded_params[key] = _encode_query_parameter(value)
- url_query += ('?' + urllib.urlencode(encoded_params))
- if body is not None and not isinstance(body, basestring):
- body = json.dumps(body)
- content_type = 'application/json'
- headers = {}
- if content_type:
- headers['content-type'] = content_type
-
- # Patched: We would like to receive gzip pretty please
- # ----------------------------------------------------
- headers['accept-encoding'] = "gzip"
- # ----------------------------------------------------
-
- headers.update(
- self._sign_request(method, unquoted_url, encoded_params))
-
- for delay in self._delays:
- try:
- self._conn.request(method, url_query, body, headers)
- return self._response()
- except errors.Unavailable, e:
- sleep(delay)
- raise e
-
- def _response(self):
- """
- Return the response of the (possibly gzipped) HTTP request.
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- resp = self._conn.getresponse()
- body = resp.read()
- headers = dict(resp.getheaders())
-
- # Patched: We would like to decode gzip
- # ----------------------------------------------------
- encoding = headers.get('content-encoding', '')
- if "gzip" in encoding:
- body = _gunzip(body)
- # ----------------------------------------------------
-
- if resp.status in (200, 201):
- return body, headers
- elif resp.status in http_errors.ERROR_STATUSES:
- try:
- respdic = json.loads(body)
- except ValueError:
- pass
- else:
- self._error(respdic)
- # special case
- if resp.status == 503:
- raise errors.Unavailable(body, headers)
- raise errors.HTTPError(resp.status, body, headers)
-
- def _prepare(self, comma, entries, **dic):
- """
- Prepare an entry to be sent through a syncing POST request.
-
- :param comma: A string to be prepended to the current entry.
- :type comma: str
- :param entries: A list of entries accumulated to be sent on the
- request.
- :type entries: list
- :param dic: The data to be included in this entry.
- :type dic: dict
-
- :return: The size of the prepared entry.
- :rtype: int
- """
- entry = comma + '\r\n' + json.dumps(dic)
- entries.append(entry)
- return len(entry)
-
- def _init_post_request(self, action, content_length):
- """
- Initiate a syncing POST request.
-
- :param url: The syncing URL.
- :type url: str
- :param action: The syncing action, either 'get' or 'receive'.
- :type action: str
- :param headers: The initial headers to be sent on this request.
- :type headers: dict
- :param content_length: The content-length of the request.
- :type content_length: int
- """
- self._conn.putrequest('POST', self._query_string)
- self._conn.putheader(
- 'content-type', 'application/x-soledad-sync-%s' % action)
- for header_name, header_value in self._headers:
- self._conn.putheader(header_name, header_value)
- self._conn.putheader('accept-encoding', 'gzip')
- self._conn.putheader('content-length', str(content_length))
- self._conn.endheaders()
-
- def _get_doc(self, received, sync_id, last_known_generation,
- last_known_trans_id):
- """
- Get a sync document from server by means of a POST request.
-
- :param received: The number of documents already received in the
- current sync session.
- :type received: int
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # inform server of how many documents have already been received
- size += self._prepare(
- ',', entries, received=received)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('get', size)
- # get document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _put_doc(self, sync_id, last_known_generation, last_known_trans_id,
- id, rev, content, gen, trans_id, number_of_docs, doc_idx):
- """
- Put a sync document on server by means of a POST request.
-
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param id: The document id.
- :type id: str
- :param rev: The document revision.
- :type rev: str
- :param content: The serialized document content.
- :type content: str
- :param gen: The generation of the modification of the document.
- :type gen: int
- :param trans_id: The transaction id of the modification of the
- document.
- :type trans_id: str
- :param number_of_docs: The total amount of documents sent on this sync
- session.
- :type number_of_docs: int
- :param doc_idx: The index of the current document being sent.
- :type doc_idx: int
-
- :return: The body and headers of the response.
- :rtype: tuple
- """
- # prepare to send the document
- entries = ['[']
- size = 1
- # add remote replica metadata to the request
- size += self._prepare(
- '', entries,
- last_known_generation=last_known_generation,
- last_known_trans_id=last_known_trans_id,
- sync_id=sync_id,
- ensure=self._ensure_callback is not None)
- # add the document to the request
- size += self._prepare(
- ',', entries,
- id=id, rev=rev, content=content, gen=gen, trans_id=trans_id,
- number_of_docs=number_of_docs, doc_idx=doc_idx)
- entries.append('\r\n]')
- size += len(entries[-1])
- # send headers
- self._init_post_request('put', size)
- # send document
- for entry in entries:
- self._conn.send(entry)
- return self._response()
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)
-
-
-class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
- """
- A SyncTarget that encrypts data before sending and decrypts data after
- receiving.
-
- Normally encryption will have been written to the sync database upon
- document modification. The sync database is also used to write temporarily
- the parsed documents that the remote send us, before being decrypted and
- written to the main database.
- """
-
- # will later keep a reference to the insert-doc callback
- # passed to sync_exchange
- _insert_doc_cb = defaultdict(lambda: ProxyBase(None))
-
- #
- # Modified HTTPSyncTarget methods.
- #
-
- def __init__(self, url, source_replica_uid=None, creds=None, crypto=None,
- sync_db=None, sync_enc_pool=None):
- """
- Initialize the SoledadSyncTarget.
-
- :param source_replica_uid: The source replica uid which we use when
- deferring decryption.
- :type source_replica_uid: str
- :param url: The url of the target replica to sync with.
- :type url: str
- :param creds: Optional dictionary giving credentials.
- to authorize the operation with the server.
- :type creds: dict
- :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt
- document contents when syncing.
- :type crypto: soledad.crypto.SoledadCrypto
- :param sync_db: Optional. handler for the db with the symmetric
- encryption of the syncing documents. If
- None, encryption will be done in-place,
- instead of retreiving it from the dedicated
- database.
- :type sync_db: Sqlite handler
- """
- HTTPSyncTarget.__init__(self, url, creds)
- self._raw_url = url
- self._raw_creds = creds
- self._crypto = crypto
- self._stopped = True
- self._stop_lock = threading.Lock()
- self.source_replica_uid = source_replica_uid
- self._syncer_pool = None
-
- # asynchronous encryption/decryption attributes
- self._sync_db = sync_db
- self._sync_enc_pool = sync_enc_pool
- self._decryption_callback = None
- self._sync_decr_pool = None
-
- def _setup_sync_decr_pool(self):
- """
- Set up the SyncDecrypterPool for deferred decryption.
- """
- if self._sync_decr_pool is None and self._sync_db is not None:
- # initialize syncing queue decryption pool
- self._sync_decr_pool = SyncDecrypterPool(
- self._crypto,
- self._sync_db,
- insert_doc_cb=self._insert_doc_cb,
- source_replica_uid=self.source_replica_uid)
-
- def _teardown_sync_decr_pool(self):
- """
- Tear down the SyncDecrypterPool.
- """
- self._sync_decr_pool.close()
- self._sync_decr_pool = None
-
- def _get_replica_uid(self, url):
- """
- Return replica uid from the url, or None.
-
- :param url: the replica url
- :type url: str
- """
- replica_uid_match = re.findall("user-([0-9a-fA-F]+)", url)
- return replica_uid_match[0] if len(replica_uid_match) > 0 else None
-
- @staticmethod
- def connect(url, source_replica_uid=None, crypto=None):
- return SoledadSyncTarget(
- url, source_replica_uid=source_replica_uid, crypto=crypto)
-
- def _parse_received_doc_response(self, response):
- """
- Parse the response from the server containing the received document.
-
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- data, _ = response
- # decode incoming stream
- parts = data.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
- # decode metadata
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- try:
- metadata = json.loads(line)
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- number_of_changes = metadata['number_of_changes']
- except (json.JSONDecodeError, KeyError):
- raise errors.BrokenSyncStream
- # make sure we have replica_uid from fresh new dbs
- if self._ensure_callback and 'replica_uid' in metadata:
- self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- doc_id = None
- rev = None
- content = None
- gen = None
- trans_id = None
- if number_of_changes > 0:
- try:
- entry = json.loads(data[1])
- doc_id = entry['id']
- rev = entry['rev']
- content = entry['content']
- gen = entry['gen']
- trans_id = entry['trans_id']
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- doc_id, rev, content, gen, trans_id
-
- def _insert_received_doc(self, idx, total, response):
- """
- Insert a received document into the local replica.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- :param response: The body and headers of the response.
- :type response: tuple(str, dict)
- """
- new_generation, new_transaction_id, number_of_changes, doc_id, \
- rev, content, gen, trans_id = \
- self._parse_received_doc_response(response)
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # -------------------------------------------------------------
- # symmetric decryption of document's contents
- # -------------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt it.
- # We do it inline if defer_decryption flag is False or no sync_db
- # was defined, otherwise we defer it writing it to the received
- # docs table.
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- if self._queue_for_decrypt:
- self._enqueue_encrypted_received_doc(
- doc, gen, trans_id, idx, total)
- else:
- # defer_decryption is False or no-sync-db fallback
- doc.set_json(decrypt_doc(self._crypto, doc))
- self._return_doc_cb(doc, gen, trans_id)
- else:
- # not symmetrically encrypted doc, insert it directly
- # or save it in the decrypted stage.
- if self._queue_for_decrypt:
- self._enqueue_received_doc(doc, gen, trans_id, idx, total)
- else:
- self._return_doc_cb(doc, gen, trans_id)
- # -------------------------------------------------------------
- # end of symmetric decryption
- # -------------------------------------------------------------
- msg = "%d/%d" % (idx + 1, total)
- signal(SOLEDAD_SYNC_RECEIVE_STATUS, msg)
- logger.debug("Soledad sync receive status: %s" % msg)
- return number_of_changes, new_generation, new_transaction_id
-
- def _get_remote_docs(self, url, last_known_generation, last_known_trans_id,
- headers, return_doc_cb, ensure_callback, sync_id,
- defer_decryption=False):
- """
- Fetch sync documents from the remote database and insert them in the
- local database.
-
- If an incoming document's encryption scheme is equal to
- EncryptionSchemes.SYMKEY, then this method will decrypt it with
- Soledad's symmetric key.
-
- :param url: The syncing URL.
- :type url: str
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
- :param headers: The headers of the HTTP request.
- :type headers: dict
- :param return_doc_cb: A callback to insert docs from target.
- :type return_doc_cb: callable
- :param ensure_callback: A callback to ensure we have the correct
- target_replica_uid, if it was just created.
- :type ensure_callback: callable
- :param sync_id: The id for the current sync session.
- :type sync_id: str
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :raise BrokenSyncStream: If `data` is malformed.
-
- :return: A dictionary representing the first line of the response got
- from remote replica.
- :rtype: dict
- """
- # we keep a reference to the callback in case we defer the decryption
- self._return_doc_cb = return_doc_cb
- self._queue_for_decrypt = defer_decryption \
- and self._sync_db is not None
-
- new_generation = last_known_generation
- new_transaction_id = last_known_trans_id
-
- if self._queue_for_decrypt:
- logger.debug(
- "Soledad sync: will queue received docs for decrypting.")
-
- idx = 0
- number_of_changes = 1
-
- first_request = True
- last_callback_lock = None
- threads = []
-
- # get incoming documents
- while idx < number_of_changes:
- # bail out if sync process was interrupted
- if self.stopped is True:
- break
-
- # launch a thread to fetch one document from target
- t = self._syncer_pool.new_syncer_thread(
- idx, number_of_changes,
- last_callback_lock=last_callback_lock)
-
- # bail out if any thread failed
- if t is None:
- self.stop(fail=True)
- break
-
- if defer_decryption:
- self._setup_sync_decr_pool()
-
- t.doc_syncer.set_request_method(
- 'get', idx, sync_id, last_known_generation,
- last_known_trans_id)
- t.doc_syncer.set_success_callback(self._insert_received_doc)
-
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while getting document " \
- "%d/%d: %s" \
- % (idx + 1, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
- threads.append(t)
- t.start()
- last_callback_lock = t.callback_lock
- idx += 1
-
- # if this is the first request, wait to update the number of
- # changes
- if first_request is True:
- t.join()
- if t.success:
- number_of_changes, _, _ = t.result
- if defer_decryption:
- self._sync_decr_pool.set_docs_to_process(
- number_of_changes)
- else:
- raise t.exception
- first_request = False
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t = threads.pop(0)
- t.join()
- if t.success:
- last_successful_thread = t
- else:
- raise t.exception
-
- # get information about last successful thread
- if last_successful_thread is not None:
- body, _ = last_successful_thread.response
- parsed_body = json.loads(body)
- # get current target gen and trans id in case no documents were
- # transferred
- if len(parsed_body) == 1:
- metadata = parsed_body[0]
- new_generation = metadata['new_generation']
- new_transaction_id = metadata['new_transaction_id']
- # get current target gen and trans id from last transferred
- # document
- else:
- doc_data = parsed_body[1]
- new_generation = doc_data['gen']
- new_transaction_id = doc_data['trans_id']
-
- # decrypt docs in case of deferred decryption
- if defer_decryption:
- self._sync_decr_pool.wait()
- self._teardown_sync_decr_pool()
-
- return new_generation, new_transaction_id
-
- @property
- def _defer_encryption(self):
- return self._sync_enc_pool is not None
-
- @property
- def _defer_decryption(self):
- return self._sync_decr_pool is not None
-
- def sync_exchange(self, docs_by_generations,
- source_replica_uid, last_known_generation,
- last_known_trans_id, return_doc_cb,
- ensure_callback=None, defer_decryption=True,
- sync_id=None):
- """
- Find out which documents the remote database does not know about,
- encrypt and send them.
-
- This does the same as the parent's method but encrypts content before
- syncing.
-
- :param docs_by_generations: A list of (doc_id, generation, trans_id)
- of local documents that were changed since
- the last local generation the remote
- replica knows about.
- :type docs_by_generations: list of tuples
-
- :param source_replica_uid: The uid of the source replica.
- :type source_replica_uid: str
-
- :param last_known_generation: Target's last known generation.
- :type last_known_generation: int
-
- :param last_known_trans_id: Target's last known transaction id.
- :type last_known_trans_id: str
-
- :param return_doc_cb: A callback for inserting received documents from
- target. If not overriden, this will call u1db
- insert_doc_from_target in synchronizer, which
- implements the TAKE OTHER semantics.
- :type return_doc_cb: function
-
- :param ensure_callback: A callback that ensures we know the target
- replica uid if the target replica was just
- created.
- :type ensure_callback: function
-
- :param defer_decryption: Whether to defer the decryption process using
- the intermediate database. If False,
- decryption will be done inline.
- :type defer_decryption: bool
-
- :return: The new generation and transaction id of the target replica.
- :rtype: tuple
- """
- self._ensure_callback = ensure_callback
-
- self.start()
-
- if sync_id is None:
- sync_id = str(uuid4())
- self.source_replica_uid = source_replica_uid
- # let the decrypter pool access the passed callback to insert docs
- setProxiedObject(self._insert_doc_cb[source_replica_uid],
- return_doc_cb)
-
- self._ensure_connection()
- if self._trace_hook: # for tests
- self._trace_hook('sync_exchange')
- url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
- headers = self._sign_request('POST', url, {})
-
- cur_target_gen = last_known_generation
- cur_target_trans_id = last_known_trans_id
-
- # -------------------------------------------------------------------
- # start of send documents to target
- # -------------------------------------------------------------------
- msg = "%d/%d" % (0, len(docs_by_generations))
- signal(SOLEDAD_SYNC_SEND_STATUS, msg)
- logger.debug("Soledad sync send status: %s" % msg)
-
- self._syncer_pool = DocumentSyncerPool(
- self._raw_url, self._raw_creds, url, headers, ensure_callback,
- self.stop_syncer)
- threads = []
- last_callback_lock = None
-
- sent = 0
- total = len(docs_by_generations)
-
- synced = []
- number_of_docs = len(docs_by_generations)
-
- last_request_lock = None
- for doc, gen, trans_id in docs_by_generations:
- # allow for interrupting the sync process
- if self.stopped is True:
- break
-
- # skip non-syncable docs
- if isinstance(doc, SoledadDocument) and not doc.syncable:
- continue
-
- # -------------------------------------------------------------
- # symmetric encryption of document's contents
- # -------------------------------------------------------------
-
- # the following var will hold a deferred because we may try to
- # fetch the encrypted document from the sync db
- d = None
-
- if doc.is_tombstone():
- d = defer.succeed(None)
- elif not self._defer_encryption:
- # fallback case, for tests
- d = defer.succeed(encrypt_doc(self._crypto, doc))
- else:
-
- def _maybe_encrypt_doc_inline(doc_json):
- if doc_json is None:
- # the document is not marked as tombstone, but we got
- # nothing from the sync db. As it is not encrypted
- # yet, we force inline encryption.
- # TODO: implement a queue to deal with these cases.
- return encrypt_doc(self._crypto, doc)
- return doc_json
-
- d = self.get_encrypted_doc_from_db(doc.doc_id, doc.rev)
- d.addCallback(_maybe_encrypt_doc_inline)
- # -------------------------------------------------------------
- # end of symmetric encryption
- # -------------------------------------------------------------
-
- t = self._syncer_pool.new_syncer_thread(
- sent + 1, total, last_request_lock=last_request_lock,
- last_callback_lock=last_callback_lock)
-
- # bail out if creation of any thread failed
- if t is None:
- self.stop(fail=True)
- break
-
- # the following callback will be called when the document's
- # encrypted content is available, either because it was found on
- # the sync db or because it has been encrypted inline.
-
- def _configure_and_start_thread(t, doc_json):
- # set the request method
- t.doc_syncer.set_request_method(
- 'put', sync_id, cur_target_gen, cur_target_trans_id,
- id=doc.doc_id, rev=doc.rev, content=doc_json, gen=gen,
- trans_id=trans_id, number_of_docs=number_of_docs,
- doc_idx=sent + 1)
- # set the success calback
-
- def _success_callback(idx, total, response):
- _success_msg = "Soledad sync send status: %d/%d" \
- % (idx, total)
- signal(SOLEDAD_SYNC_SEND_STATUS, _success_msg)
- logger.debug(_success_msg)
-
- t.doc_syncer.set_success_callback(_success_callback)
-
- # set the failure callback
- def _failure_callback(idx, total, exception):
- _failure_msg = "Soledad sync: error while sending document " \
- "%d/%d: %s" % (idx, total, exception)
- logger.warning("%s" % _failure_msg)
- logger.warning("Soledad sync: failing gracefully, will "
- "recover on next sync.")
-
- t.doc_syncer.set_failure_callback(_failure_callback)
-
- # save thread and append
- t.start()
-
- d.addCallback(partial(_configure_and_start_thread, t))
-
- threads.append((t, doc))
-
- # update lock references so they can be used in next call to
- # syncer_pool.new_syncer_thread() above
- last_callback_lock = t.callback_lock
- last_request_lock = t.request_lock
-
- sent += 1
-
- # make sure all threads finished and we have up-to-date info
- last_successful_thread = None
- while threads:
- # check if there are failures
- t, doc = threads.pop(0)
- t.started.wait()
- t.join()
- if t.success:
- synced.append((doc.doc_id, doc.rev))
- last_successful_thread = t
- else:
- raise t.exception
-
- # delete documents from the sync database
- if self._defer_encryption:
- self._delete_encrypted_docs_from_db(synced)
-
- # get target gen and trans_id after docs
- gen_after_send = None
- trans_id_after_send = None
- if last_successful_thread is not None:
- response_dict = json.loads(last_successful_thread.response[0])[0]
- gen_after_send = response_dict['new_generation']
- trans_id_after_send = response_dict['new_transaction_id']
- # -------------------------------------------------------------------
- # end of send documents to target
- # -------------------------------------------------------------------
-
- # -------------------------------------------------------------------
- # start of fetch documents from target
- # -------------------------------------------------------------------
- defer_decryption = defer_decryption and self._defer_decryption
- if self.stopped is False:
- cur_target_gen, cur_target_trans_id = self._get_remote_docs(
- url,
- last_known_generation, last_known_trans_id, headers,
- return_doc_cb, ensure_callback, sync_id,
- defer_decryption=defer_decryption)
- # -------------------------------------------------------------------
- # end of fetch documents from target
- # -------------------------------------------------------------------
-
- self._syncer_pool.cleanup()
-
- # update gen and trans id info in case we just sent and did not
- # receive docs.
- if gen_after_send is not None and gen_after_send > cur_target_gen:
- cur_target_gen = gen_after_send
- cur_target_trans_id = trans_id_after_send
-
- self.stop(fail=False)
- self._syncer_pool = None
- return cur_target_gen, cur_target_trans_id
-
- def start(self):
- """
- Mark current sync session as running.
- """
- with self._stop_lock:
- self._stopped = False
-
- def stop_syncer(self):
- with self._stop_lock:
- self._stopped = True
-
- def stop(self, fail=False):
- """
- Mark current sync session as stopped.
-
- This will eventually interrupt the sync_exchange() method and return
- enough information to the synchronizer so the sync session can be
- recovered afterwards.
-
- :param fail: Whether we are stopping because of a failure.
- :type fail: bool
- """
- self.stop_syncer()
- if self._syncer_pool:
- self._syncer_pool.stop_threads(fail=fail)
-
- @property
- def stopped(self):
- """
- Return whether this sync session is stopped.
-
- :return: Whether this sync session is stopped.
- :rtype: bool
- """
- with self._stop_lock:
- return self._stopped is True
-
- #
- # Symmetric encryption of syncing docs
- #
-
- def get_encrypted_doc_from_db(self, doc_id, doc_rev):
- """
- Retrieve encrypted document from the database of encrypted docs for
- sync.
-
- :param doc_id: The Document id.
- :type doc_id: str
-
- :param doc_rev: The document revision
- :type doc_rev: str
-
- :return: A deferred which is fired with the document's encrypted
- content or None if the document was not found on the sync db.
- :rtype: twisted.internet.defer.Deferred
- """
- logger.debug("Looking for encrypted document on sync db: %s" % doc_id)
- return self._sync_enc_pool.get_encrypted_doc(doc_id, doc_rev)
-
- def _delete_encrypted_docs_from_db(self, docs):
- """
- Delete several encrypted documents from the database of symmetrically
- encrypted docs to sync.
-
- :param docs: an iterable with (doc_id, doc_rev) for all documents
- to be deleted.
- :type docs: any iterable of tuples of str
- """
- for doc_id, doc_rev in docs:
- logger.debug("Removing encrypted document on sync db: %s"
- % doc_id)
- return self._sync_enc_pool.delete_encrypted_doc(doc_id, doc_rev)
-
- #
- # Symmetric decryption of syncing docs
- #
-
- def _enqueue_encrypted_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save a symmetrically encrypted incoming document into the received
- docs table in the sync db. A decryption task will pick it up
- from here in turn.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
-
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug("Enqueueing doc for decryption: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_encrypted_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
-
- def _enqueue_received_doc(self, doc, gen, trans_id, idx, total):
- """
- Save any incoming document into the received docs table in the sync db.
-
- :param doc: The document to save.
- :type doc: SoledadDocument
- :param gen: The generation.
- :type gen: str
- :param trans_id: Transacion id.
- :type gen: str
- :param idx: The index count of the current operation.
- :type idx: int
- :param total: The total number of operations.
- :type total: int
- """
- logger.debug("Enqueueing doc, no decryption needed: %d/%d."
- % (idx + 1, total))
- self._sync_decr_pool.insert_received_doc(
- doc.doc_id, doc.rev, doc.content, gen, trans_id, idx + 1)
-
- def set_decryption_callback(self, cb):
- """
- Set callback to be called when the decryption finishes.
-
- :param cb: The callback to be set.
- :type cb: callable
- """
- self._decryption_callback = cb
-
- def has_decryption_callback(self):
- """
- Return True if there is a decryption callback set.
- :rtype: bool
- """
- return self._decryption_callback is not None
-
- #
- # Authentication methods
- #
-
- def _sign_request(self, method, url_query, params):
- """
- Return an authorization header to be included in the HTTP request.
-
- :param method: The HTTP method.
- :type method: str
- :param url_query: The URL query string.
- :type url_query: str
- :param params: A list with encoded query parameters.
- :type param: list
-
- :return: The Authorization header.
- :rtype: list of tuple
- """
- return TokenBasedAuth._sign_request(self, method, url_query, params)
-
- def set_token_credentials(self, uuid, token):
- """
- Store given credentials so we can sign the request later.
-
- :param uuid: The user's uuid.
- :type uuid: str
- :param token: The authentication token.
- :type token: str
- """
- TokenBasedAuth.set_token_credentials(self, uuid, token)