diff options
-rw-r--r-- | client/src/leap/soledad/client/http_target.py | 732 |
1 files changed, 0 insertions, 732 deletions
diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py deleted file mode 100644 index 046af089..00000000 --- a/client/src/leap/soledad/client/http_target.py +++ /dev/null @@ -1,732 +0,0 @@ -# -*- coding: utf-8 -*- -# http_target.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" - - -import json -import base64 -import logging -import warnings - -from uuid import uuid4 - -from twisted.internet import defer -from twisted.web.error import Error -from twisted.web.client import _ReadBodyProtocol -from twisted.web.client import PartialDownloadError -from twisted.web._newclient import ResponseDone -from twisted.web._newclient import PotentialDataLoss - -from u1db import errors -from u1db import SyncTarget -from u1db.remote import utils -from u1db.remote import http_errors - -from leap.common.http import HTTPClient - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError - -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit -from leap.soledad.client.encdecpool import SyncDecrypterPool - - -logger = logging.getLogger(__name__) - - -# we want to make sure that HTTP errors will raise appropriate u1db errors, -# that is, fire errbacks with the appropriate failures, in the context of -# twisted. Because of that, we redefine the http body reader used by the HTTP -# client below. - -class ReadBodyProtocol(_ReadBodyProtocol): - - def __init__(self, response, deferred): - """ - Initialize the protocol, additionally storing the response headers. - """ - _ReadBodyProtocol.__init__( - self, response.code, response.phrase, deferred) - self.headers = response.headers - - # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks - def _error(self, respdic): - descr = respdic.get("error") - exc_cls = errors.wire_description_to_exc.get(descr) - if exc_cls is not None: - message = respdic.get("message") - self.deferred.errback(exc_cls(message)) - # ---8<--- end of snippet from u1db.remote.http_client - - def connectionLost(self, reason): - """ - Deliver the accumulated response bytes to the waiting L{Deferred}, if - the response body has been completely received without error. - """ - if reason.check(ResponseDone): - - body = b''.join(self.dataBuffer) - - # ---8<--- snippet from u1db.remote.http_client - if self.status in (200, 201): - self.deferred.callback(body) - elif self.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - else: - self._error(respdic) - # special cases - elif self.status == 503: - self.deferred.errback(errors.Unavailable(body, self.headers)) - else: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - # ---8<--- end of snippet from u1db.remote.http_client - - elif reason.check(PotentialDataLoss): - self.deferred.errback( - PartialDownloadError(self.status, self.message, - b''.join(self.dataBuffer))) - else: - self.deferred.errback(reason) - - -def readBody(response): - """ - Get the body of an L{IResponse} and return it as a byte string. - - This is a helper function for clients that don't want to incrementally - receive the body of an HTTP response. - - @param response: The HTTP response for which the body will be read. - @type response: L{IResponse} provider - - @return: A L{Deferred} which will fire with the body of the response. - Cancelling it will close the connection to the server immediately. - """ - def cancel(deferred): - """ - Cancel a L{readBody} call, close the connection to the HTTP server - immediately, if it is still open. - - @param deferred: The cancelled L{defer.Deferred}. - """ - abort = getAbort() - if abort is not None: - abort() - - d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) - - def getAbort(): - return getattr(protocol.transport, 'abortConnection', None) - - response.deliverBody(protocol) - - if protocol.transport is not None and getAbort() is None: - warnings.warn( - 'Using readBody with a transport that does not have an ' - 'abortConnection method', - category=DeprecationWarning, - stacklevel=2) - - return d - - -class SoledadHTTPSyncTarget(SyncTarget): - - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): - """ - Initialize the sync target. - - :param url: The server sync url. - :type url: str - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param creds: A dictionary containing the uuid and token. - :type creds: creds - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param cert_file: Path to the certificate of the ca used to validate - the SSL certificate used by the remote soledad - server. - :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool - """ - if url.endswith("/"): - url = url[:-1] - self._url = str(url) + "/sync-from/" + str(source_replica_uid) - self.source_replica_uid = source_replica_uid - self._auth_header = None - self.set_creds(creds) - self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool - self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - self._http = HTTPClient(cert_file) - - def close(self): - self._http.close() - - def set_creds(self, creds): - """ - Update credentials. - - :param creds: A dictionary containing the uuid and token. - :type creds: dict - """ - uuid = creds['token']['uuid'] - token = creds['token']['token'] - auth = '%s:%s' % (uuid, token) - b64_token = base64.b64encode(auth) - self._auth_header = {'Authorization': ['Token %s' % b64_token]} - - @property - def _base_header(self): - return self._auth_header.copy() if self._auth_header else {} - - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - - # - # SyncTarget API - # - - @defer.inlineCallbacks - def get_sync_info(self, source_replica_uid): - """ - Return information about known state of remote database. - - Return the replica_uid and the current database generation of the - remote database, and its last-seen database generation for the client - replica. - - :param source_replica_uid: The client-size replica uid. - :type source_replica_uid: str - - :return: A deferred which fires with (target_replica_uid, - target_replica_generation, target_trans_id, - source_replica_last_known_generation, - source_replica_last_known_transaction_id) - :rtype: twisted.internet.defer.Deferred - """ - raw = yield self._http_request(self._url) - res = json.loads(raw) - defer.returnValue(( - res['target_replica_uid'], - res['target_replica_generation'], - res['target_replica_transaction_id'], - res['source_replica_generation'], - res['source_transaction_id'] - )) - - def record_sync_info( - self, source_replica_uid, source_replica_generation, - source_replica_transaction_id): - """ - Record tip information for another replica. - - After sync_exchange has been processed, the caller will have - received new content from this replica. This call allows the - source replica instigating the sync to inform us what their - generation became after applying the documents we returned. - - This is used to allow future sync operations to not need to repeat data - that we just talked about. It also means that if this is called at the - wrong time, there can be database records that will never be - synchronized. - - :param source_replica_uid: The identifier for the source replica. - :type source_replica_uid: str - :param source_replica_generation: The database generation for the - source replica. - :type source_replica_generation: int - :param source_replica_transaction_id: The transaction id associated - with the source replica - generation. - :type source_replica_transaction_id: str - - :return: A deferred which fires with the result of the query. - :rtype: twisted.internet.defer.Deferred - """ - data = json.dumps({ - 'generation': source_replica_generation, - 'transaction_id': source_replica_transaction_id - }) - return self._http_request( - self._url, - method='PUT', - body=data, - content_type='application/json') - - @defer.inlineCallbacks - def sync_exchange(self, docs_by_generation, source_replica_uid, - last_known_generation, last_known_trans_id, - insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. After that, receive documents from the remote - database. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: A deferred which fires with the new generation and - transaction id of the target replica. - :rtype: twisted.internet.defer.Deferred - """ - - self._ensure_callback = ensure_callback - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - - # save a reference to the callback so we can use it after decrypting - self._insert_doc_cb = insert_doc_cb - - gen_after_send, trans_id_after_send = yield self._send_docs( - docs_by_generation, - last_known_generation, - last_known_trans_id, - sync_id) - - cur_target_gen, cur_target_trans_id = yield self._receive_docs( - last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - defer.returnValue([cur_target_gen, cur_target_trans_id]) - - # - # methods to send docs - # - - @defer.inlineCallbacks - def _send_docs(self, docs_by_generation, last_known_generation, - last_known_trans_id, sync_id): - - if not docs_by_generation: - defer.returnValue([None, None]) - - # add remote replica metadata to the request - initial_body = RequestBody( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - entries = yield self._entries_from_docs( - initial_body, docs_by_generation) - while len(entries): - result = yield self._http_request( - self._url, - method='POST', - body=entries.remove(1), - content_type='application/x-soledad-sync-put') - idx = total - len(entries) - if self._defer_encryption: - self._delete_sent(idx, docs_by_generation) - _emit_send(idx, total) - response_dict = json.loads(result)[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - defer.returnValue([gen_after_send, trans_id_after_send]) - - def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx][0] - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - - @defer.inlineCallbacks - def _entries_from_docs(self, initial_body, docs_by_generation): - number_of_docs = len(docs_by_generation) - for idx, (doc, gen, trans_id) in enumerate(docs_by_generation, 1): - content = yield self._encrypt_doc(doc) - initial_body.insert_info( - id=doc.doc_id, rev=doc.rev, content=content, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=idx) - defer.returnValue(initial_body) - - def _encrypt_doc(self, doc): - d = None - if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) - else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d - - # - # methods to receive doc - # - - @defer.inlineCallbacks - def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - doc = yield self._receive_one_doc( - last_known_generation, last_known_trans_id, - sync_id, 0) - self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) - - # update the target gen and trans_id in case a document was received - if ngen: - new_generation = ngen - new_transaction_id = ntrans - - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - - defer.returnValue([new_generation, new_transaction_id]) - - def _receive_one_doc(self, last_known_generation, - last_known_trans_id, sync_id, received): - # add remote replica metadata to the request - body = RequestBody( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - body.insert_info(received=received) - # send headers - return self._http_request( - self._url, - method='POST', - body=str(body), - content_type='application/x-soledad-sync-get') - - def _insert_received_doc(self, response, idx, total): - """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - self._received_docs += 1 - _emit_received(self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) - :rtype: tuple - """ - # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (ValueError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - - def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None): - headers = headers or self._base_header - if content_type: - headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, readBody) - d.addErrback(_unauth_to_invalid_token_error) - return d - - -def _unauth_to_invalid_token_error(failure): - """ - An errback to translate unauthorized errors to our own invalid token - class. - - :param failure: The original failure. - :type failure: twisted.python.failure.Failure - - :return: Either the original failure or an invalid auth token error. - :rtype: twisted.python.failure.Failure - """ - failure.trap(Error) - if failure.getErrorMessage() == "401 Unauthorized": - raise InvalidAuthTokenError - return failure - - -def _emit_send(idx, total): - msg = "%d/%d" % (idx, total) - emit( - SOLEDAD_SYNC_SEND_STATUS, - "Soledad sync send status: %s" % msg) - logger.debug("Sync send status: %s" % msg) - - -def _emit_received(received_docs, total): - msg = "%d/%d" % (received_docs, total) - emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) - logger.debug("Sync receive status: %s" % msg) - - -class RequestBody(object): - - def __init__(self, **header_dict): - self.headers = header_dict - self.entries = [] - - def insert_info(self, **entry_dict): - entry = json.dumps(entry_dict) - self.entries.append(entry) - return len(entry) - - def remove(self, number=1): - entries = [self.entries.pop(0) for i in xrange(number)] - return self.entries_to_str(entries) - - def __str__(self): - return self.entries_to_str(self.entries) - - def __len__(self): - return len(self.entries) - - def entries_to_str(self, entries=None): - data = '[\r\n' + json.dumps(self.headers) - data += ''.join(',\r\n' + entry for entry in entries) - return data + '\r\n]' |