From 3e94cafa43d464d73815e21810b97a4faf54136d Mon Sep 17 00:00:00 2001 From: drebs Date: Sun, 18 Jun 2017 11:18:10 -0300 Subject: [pkg] unify client and server into a single python package We have been discussing about this merge for a while. Its main goal is to simplify things: code navigation, but also packaging. The rationale is that the code is more cohesive in this way, and there's only one source package to install. Dependencies that are only for the server or the client will not be installed by default, and they are expected to be provided by the environment. There are setuptools extras defined for the client and the server. Debianization is still expected to split the single source package into 3 binaries. Another avantage is that the documentation can now install a single package with a single step, and therefore include the docstrings into the generated docs. - Resolves: #8896 --- src/leap/soledad/client/http_target/__init__.py | 94 ++++++++ src/leap/soledad/client/http_target/api.py | 248 +++++++++++++++++++++ src/leap/soledad/client/http_target/fetch.py | 161 +++++++++++++ .../soledad/client/http_target/fetch_protocol.py | 157 +++++++++++++ src/leap/soledad/client/http_target/send.py | 107 +++++++++ .../soledad/client/http_target/send_protocol.py | 75 +++++++ src/leap/soledad/client/http_target/support.py | 220 ++++++++++++++++++ 7 files changed, 1062 insertions(+) create mode 100644 src/leap/soledad/client/http_target/__init__.py create mode 100644 src/leap/soledad/client/http_target/api.py create mode 100644 src/leap/soledad/client/http_target/fetch.py create mode 100644 src/leap/soledad/client/http_target/fetch_protocol.py create mode 100644 src/leap/soledad/client/http_target/send.py create mode 100644 src/leap/soledad/client/http_target/send_protocol.py create mode 100644 src/leap/soledad/client/http_target/support.py (limited to 'src/leap/soledad/client/http_target') diff --git a/src/leap/soledad/client/http_target/__init__.py b/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..b67d03f6 --- /dev/null +++ b/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import os + +from twisted.web.client import Agent +from twisted.internet import reactor + +from leap.common.certs import get_compatible_ssl_context_factory +from leap.soledad.common.log import getLogger +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + def __init__(self, url, source_replica_uid, creds, crypto, cert_file): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad._crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self._uuid = None + self.set_creds(creds) + self._crypto = crypto + # TODO: DEPRECATED CRYPTO + self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret) + self._insert_doc_cb = None + + # Twisted default Agent with our own ssl context factory + factory = get_compatible_ssl_context_factory(cert_file) + self._http = Agent(reactor, factory) + + if DO_STATS: + self.sync_exchange_phase = [0] diff --git a/src/leap/soledad/client/http_target/api.py b/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..c68185c6 --- /dev/null +++ b/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import os +import json +import base64 + +from six import StringIO +from uuid import uuid4 + +from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer + +from leap.soledad.client.http_target.support import readBody +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.common.l2db.errors import HTTPError +from leap.soledad.common.l2db import SyncTarget + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SyncTargetAPI(SyncTarget): + """ + Declares public methods and implements u1db.SyncTarget. + """ + + @property + def uuid(self): + return self._uuid + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + self._uuid = uuid + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None, body_reader=readBody, + body_producer=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) + if not body_producer and body: + body = FileBodyProducer(StringIO(body)) + elif body_producer: + # Upload case, check send.py + body = body_producer(body) + d = self._http.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(body_reader) + d.addErrback(_unauth_to_invalid_token_error) + return d + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield self._http_request(self._url) + res = json.loads(raw) + defer.returnValue(( + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + )) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + return self._http_request( + self._url, + method='PUT', + body=data, + content_type='application/json') + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + # ---------- phase 1: send docs to server ---------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + # ---------- phase 2: receive docs ----------------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + # ---------- phase 3: sync exchange is over -------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(HTTPError) + if failure.value.status == 401: + raise InvalidAuthTokenError + return failure diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..9d456830 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# fetch.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import json +from twisted.internet import defer +from twisted.internet import threads + +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit_async +from leap.soledad.client.http_target.support import RequestBody +from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted +from leap.soledad.common.l2db import errors +from leap.soledad.client import crypto as old_crypto + +from .._document import Document +from . import fetch_protocol + +logger = getLogger(__name__) + + +class HTTPDocFetcher(object): + """ + Handles Document fetching from Soledad server, using HTTP as transport. + Steps: + * Prepares metadata by asking server for one document + * Fetch the total on response and prepare to ask all remaining + * (async) Documents will come encrypted. + So we parse, decrypt and insert locally as they arrive. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id): + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes + self.semaphore = defer.DeferredSemaphore(1) + + metadata = yield self._fetch_all( + last_known_generation, last_known_trans_id, + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + + # wait for pending inserts + yield self.semaphore.acquire() + + if ngen: + new_generation = ngen + new_transaction_id = ntrans + + defer.returnValue([new_generation, new_transaction_id]) + + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get', + body_reader=body_reader) + + @defer.inlineCallbacks + def _doc_parser(self, doc_info, content, total): + """ + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str + :param total: The total number of operations. + :type total: int + """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): + doc = Document(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(content): + content = (yield self._crypto.decrypt_doc(doc)).getvalue() + elif old_crypto.is_symmetrically_encrypted(doc): + content = self._deprecated_crypto.decrypt_doc(doc) + doc.set_json(content) + + # TODO insert blobs here on the blob backend + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) + self._received_docs += 1 + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total=total) + + def _parse_metadata(self, metadata): + """ + Parse the response from the server containing the sync metadata. + + :param response: Metadata as string + :type response: str + + :return: (number_of_changes, new_gen, new_trans_id) + :rtype: tuple + """ + try: + metadata = json.loads(metadata) + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) + except (ValueError, KeyError): + raise errors.BrokenSyncStream('Metadata parsing failed') + + +def _emit_receive_status(user_data, received_docs, total): + content = {'received': received_docs, 'total': total} + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content) + + if received_docs % 20 == 0: + msg = "%d/%d" % (received_docs, total) + logger.debug("Sync receive status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/fetch_protocol.py b/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..851eb3a1 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import json +from functools import partial +from six import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self.metadata = '' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if self.deferred.called: + return + try: + if reason.check(ResponseDone): + self.dataBuffer = self.metadata + else: + self.dataBuffer = self.finish() + except errors.BrokenSyncStream as e: + return self.deferred.errback(e) + return ReadBodyProtocol.connectionLost(self, reason) + + def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + self.lineReceived(line) + self._line += 1 + + def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ + if self._properly_finished: + raise errors.BrokenSyncStream("Reading a finished stream") + if ']' == line: + self._properly_finished = True + elif self._line == 0: + if line is not '[': + raise errors.BrokenSyncStream("Invalid start") + elif self._line == 1: + self.metadata = line + if 'error' in self.metadata: + raise errors.BrokenSyncStream("Error from server: %s" % line) + self.total = json.loads(line).get('number_of_changes', -1) + elif (self._line % 2) == 0: + self.current_doc = json.loads(line) + if 'error' in self.current_doc: + raise errors.BrokenSyncStream("Error from server: %s" % line) + else: + d = self._doc_reader( + self.current_doc, line.strip() or None, self.total) + d.addErrback(self.deferred.errback) + + def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ + if not self._properly_finished: + raise errors.BrokenSyncStream('Stream not properly closed') + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type doc_reader: function + + @return: A function that can be called by the http Agent to create and + configure the proper protocol. + """ + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/src/leap/soledad/client/http_target/send.py b/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..2b286ec5 --- /dev/null +++ b/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# send.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import json + +from twisted.internet import defer + +from leap.soledad.common.log import getLogger +from leap.soledad.client.events import emit_async +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer + +logger = getLogger(__name__) + + +class HTTPDocSender(object): + """ + Handles Document uploading from Soledad server, using HTTP as transport. + They need to be encrypted and metadata prepared before sending. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + result = yield self._send_batch(body, docs_by_generation) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_batch(self, body, docs): + total, calls = len(docs), [] + for i, entry in enumerate(docs): + calls.append((self._prepare_one_doc, + entry, body, i + 1, total)) + result = yield self._send_request(body, calls) + _emit_send_status(self.uuid, body.consumed, total) + + defer.returnValue(result) + + def _send_request(self, body, calls): + return self._http_request( + self._url, + method='POST', + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) + + @defer.inlineCallbacks + def _prepare_one_doc(self, entry, body, idx, total): + get_doc_call, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc_call) + body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=total, + doc_idx=idx) + _emit_send_status(self.uuid, body.consumed, total) + + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc_call): + f, args, kwargs = get_doc_call + doc = yield f(*args, **kwargs) + if doc.is_tombstone(): + defer.returnValue((doc, None)) + else: + content = yield self._crypto.encrypt_doc(doc) + defer.returnValue((doc, content)) + + +def _emit_send_status(user_data, idx, total): + content = {'sent': idx, 'total': total} + emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content) + + msg = "%d/%d" % (idx, total) + logger.debug("Sync send status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/send_protocol.py b/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..4941aa34 --- /dev/null +++ b/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from zope.interface import implementer +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +@implementer(IBodyProducer) +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + def __init__(self, producer): + """ + Initialize the string produer. + + :param producer: A RequestBody instance and a list of producer calls + :type producer: (.support.RequestBody, [(function, *args)]) + """ + self.body, self.producer = producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.001) + continue + call = self.producer.pop(0) + fun, args = call[0], call[1:] + yield fun(*args) + consumer.write(self.body.pop(1, leave_open=True)) + consumer.write(self.body.pop(0)) # close stream + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/src/leap/soledad/client/http_target/support.py b/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..d8d8e420 --- /dev/null +++ b/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import warnings +import json + +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + + +class ReadBodyProtocol(_ReadBodyProtocol): + """ + From original Twisted implementation, focused on adding our error + handling and ensuring that the proper u1db error is raised. + """ + + def __init__(self, response, deferred): + """ + Initialize the protocol, additionally storing the response headers. + """ + _ReadBodyProtocol.__init__( + self, response.code, response.phrase, deferred) + self.headers = response.headers + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + else: + self.deferred.errback( + errors.HTTPError(self.status, respdic, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + body = b''.join(self.dataBuffer) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(body) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(self.dataBuffer))) + else: + self.deferred.errback(reason) + + +def readBody(response, protocolClass=ReadBodyProtocol): + """ + Get the body of an L{IResponse} and return it as a byte string. + + This is a helper function for clients that don't want to incrementally + receive the body of an HTTP response. + + @param response: The HTTP response for which the body will be read. + @type response: L{IResponse} provider + + @return: A L{Deferred} which will fire with the body of the response. + Cancelling it will close the connection to the server immediately. + """ + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + d = defer.Deferred(cancel) + protocol = protocolClass(response, d) + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + response.deliverBody(protocol) + + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + + return d + + +class RequestBody(object): + """ + This class is a helper to generate send and fetch requests. + The expected format is something like: + [ + {headers}, + {entry1}, + {...}, + {entryN}, + ] + """ + + def __init__(self, **header_dict): + """ + Creates a new RequestBody holding header information. + + :param header_dict: A dictionary with the headers. + :type header_dict: dict + """ + self.headers = header_dict + self.entries = [] + self.consumed = 0 + + def insert_info(self, **entry_dict): + """ + Dumps an entry into JSON format and add it to entries list. + Adds 'content' key on a new line if it's present. + + :param entry_dict: Entry as a dictionary + :type entry_dict: dict + """ + content = '' + if 'content' in entry_dict: + content = ',\r\n' + (entry_dict['content'] or '') + entry = json.dumps(entry_dict) + content + self.entries.append(entry) + + def pop(self, amount=10, leave_open=False): + """ + Removes entries and returns it formatted and ready + to be sent. + + :param amount: number of entries to pop and format + :type amount: int + + :param leave_open: flag to skip stream closing + :type amount: bool + + :return: formatted body ready to be sent + :rtype: str + """ + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 if not leave_open else False + return self.entries_to_str(entries, start, end) + + def __str__(self): + return self.pop(len(self.entries)) + + def __len__(self): + return len(self.entries) + + def entries_to_str(self, entries=None, start=True, end=True): + """ + Format a list of entries into the body format expected + by the server. + + :param entries: entries to format + :type entries: list + + :return: formatted body ready to be sent + :rtype: str + """ + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + if end: + data += '\r\n]' + return data -- cgit v1.2.3