diff options
Diffstat (limited to 'src/leap/soledad/client/http_target')
-rw-r--r-- | src/leap/soledad/client/http_target/__init__.py | 94 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/api.py | 248 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/fetch.py | 161 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/fetch_protocol.py | 157 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/send.py | 107 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/send_protocol.py | 75 | ||||
-rw-r--r-- | src/leap/soledad/client/http_target/support.py | 220 |
7 files changed, 1062 insertions, 0 deletions
diff --git a/src/leap/soledad/client/http_target/__init__.py b/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..b67d03f6 --- /dev/null +++ b/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import os + +from twisted.web.client import Agent +from twisted.internet import reactor + +from leap.common.certs import get_compatible_ssl_context_factory +from leap.soledad.common.log import getLogger +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher +from leap.soledad.client import crypto as old_crypto + + +logger = getLogger(__name__) + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ + def __init__(self, url, source_replica_uid, creds, crypto, cert_file): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad._crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self._uuid = None + self.set_creds(creds) + self._crypto = crypto + # TODO: DEPRECATED CRYPTO + self._deprecated_crypto = old_crypto.SoledadCrypto(crypto.secret) + self._insert_doc_cb = None + + # Twisted default Agent with our own ssl context factory + factory = get_compatible_ssl_context_factory(cert_file) + self._http = Agent(reactor, factory) + + if DO_STATS: + self.sync_exchange_phase = [0] diff --git a/src/leap/soledad/client/http_target/api.py b/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..c68185c6 --- /dev/null +++ b/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import json +import base64 + +from six import StringIO +from uuid import uuid4 + +from twisted.internet import defer +from twisted.web.http_headers import Headers +from twisted.web.client import FileBodyProducer + +from leap.soledad.client.http_target.support import readBody +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.common.l2db.errors import HTTPError +from leap.soledad.common.l2db import SyncTarget + + +# we may want to collect statistics from the sync process +DO_STATS = False +if os.environ.get('SOLEDAD_STATS'): + DO_STATS = True + + +class SyncTargetAPI(SyncTarget): + """ + Declares public methods and implements u1db.SyncTarget. + """ + + @property + def uuid(self): + return self._uuid + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + self._uuid = uuid + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None, body_reader=readBody, + body_producer=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) + if not body_producer and body: + body = FileBodyProducer(StringIO(body)) + elif body_producer: + # Upload case, check send.py + body = body_producer(body) + d = self._http.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(body_reader) + d.addErrback(_unauth_to_invalid_token_error) + return d + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield self._http_request(self._url) + res = json.loads(raw) + defer.returnValue(( + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + )) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + return self._http_request( + self._url, + method='PUT', + body=data, + content_type='application/json') + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + # ---------- phase 1: send docs to server ---------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + # ---------- phase 2: receive docs ----------------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + # ---------- phase 3: sync exchange is over -------------------------- + if DO_STATS: + self.sync_exchange_phase[0] += 1 + # -------------------------------------------------------------------- + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(HTTPError) + if failure.value.status == 401: + raise InvalidAuthTokenError + return failure diff --git a/src/leap/soledad/client/http_target/fetch.py b/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..9d456830 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# fetch.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from twisted.internet import defer +from twisted.internet import threads + +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit_async +from leap.soledad.client.http_target.support import RequestBody +from leap.soledad.common.log import getLogger +from leap.soledad.client._crypto import is_symmetrically_encrypted +from leap.soledad.common.l2db import errors +from leap.soledad.client import crypto as old_crypto + +from .._document import Document +from . import fetch_protocol + +logger = getLogger(__name__) + + +class HTTPDocFetcher(object): + """ + Handles Document fetching from Soledad server, using HTTP as transport. + Steps: + * Prepares metadata by asking server for one document + * Fetch the total on response and prepare to ask all remaining + * (async) Documents will come encrypted. + So we parse, decrypt and insert locally as they arrive. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id): + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + # Acts as a queue, ensuring line order on async processing + # as `self._insert_doc_cb` cant be run concurrently or out of order. + # DeferredSemaphore solves the concurrency and its implementation uses + # a queue, solving the ordering. + # FIXME: Find a proper solution to avoid surprises on Twisted changes + self.semaphore = defer.DeferredSemaphore(1) + + metadata = yield self._fetch_all( + last_known_generation, last_known_trans_id, + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) + + # wait for pending inserts + yield self.semaphore.acquire() + + if ngen: + new_generation = ngen + new_transaction_id = ntrans + + defer.returnValue([new_generation, new_transaction_id]) + + def _fetch_all(self, last_known_generation, + last_known_trans_id, sync_id): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get', + body_reader=body_reader) + + @defer.inlineCallbacks + def _doc_parser(self, doc_info, content, total): + """ + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str + :param total: The total number of operations. + :type total: int + """ + yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content, + total) + + @defer.inlineCallbacks + def __atomic_doc_parse(self, doc_info, content, total): + doc = Document(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(content): + content = (yield self._crypto.decrypt_doc(doc)).getvalue() + elif old_crypto.is_symmetrically_encrypted(doc): + content = self._deprecated_crypto.decrypt_doc(doc) + doc.set_json(content) + + # TODO insert blobs here on the blob backend + # FIXME: This is wrong. Using the very same SQLite connection object + # from multiple threads is dangerous. We should bring the dbpool here + # or find an alternative. Deferring to a thread only helps releasing + # the reactor for other tasks as this is an IO intensive call. + yield threads.deferToThread(self._insert_doc_cb, + doc, doc_info['gen'], doc_info['trans_id']) + self._received_docs += 1 + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total=total) + + def _parse_metadata(self, metadata): + """ + Parse the response from the server containing the sync metadata. + + :param response: Metadata as string + :type response: str + + :return: (number_of_changes, new_gen, new_trans_id) + :rtype: tuple + """ + try: + metadata = json.loads(metadata) + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) + except (ValueError, KeyError): + raise errors.BrokenSyncStream('Metadata parsing failed') + + +def _emit_receive_status(user_data, received_docs, total): + content = {'received': received_docs, 'total': total} + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content) + + if received_docs % 20 == 0: + msg = "%d/%d" % (received_docs, total) + logger.debug("Sync receive status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/fetch_protocol.py b/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..851eb3a1 --- /dev/null +++ b/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# fetch_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +from functools import partial +from six import StringIO +from twisted.web._newclient import ResponseDone +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.log import getLogger +from .support import ReadBodyProtocol +from .support import readBody + +logger = getLogger(__name__) + + +class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self.metadata = '' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if self.deferred.called: + return + try: + if reason.check(ResponseDone): + self.dataBuffer = self.metadata + else: + self.dataBuffer = self.finish() + except errors.BrokenSyncStream as e: + return self.deferred.errback(e) + return ReadBodyProtocol.connectionLost(self, reason) + + def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + self.lineReceived(line) + self._line += 1 + + def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ + if self._properly_finished: + raise errors.BrokenSyncStream("Reading a finished stream") + if ']' == line: + self._properly_finished = True + elif self._line == 0: + if line is not '[': + raise errors.BrokenSyncStream("Invalid start") + elif self._line == 1: + self.metadata = line + if 'error' in self.metadata: + raise errors.BrokenSyncStream("Error from server: %s" % line) + self.total = json.loads(line).get('number_of_changes', -1) + elif (self._line % 2) == 0: + self.current_doc = json.loads(line) + if 'error' in self.current_doc: + raise errors.BrokenSyncStream("Error from server: %s" % line) + else: + d = self._doc_reader( + self.current_doc, line.strip() or None, self.total) + d.addErrback(self.deferred.errback) + + def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ + if not self._properly_finished: + raise errors.BrokenSyncStream('Stream not properly closed') + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type doc_reader: function + + @return: A function that can be called by the http Agent to create and + configure the proper protocol. + """ + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/src/leap/soledad/client/http_target/send.py b/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..2b286ec5 --- /dev/null +++ b/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# send.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json + +from twisted.internet import defer + +from leap.soledad.common.log import getLogger +from leap.soledad.client.events import emit_async +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +from .send_protocol import DocStreamProducer + +logger = getLogger(__name__) + + +class HTTPDocSender(object): + """ + Handles Document uploading from Soledad server, using HTTP as transport. + They need to be encrypted and metadata prepared before sending. + """ + + # The uuid of the local replica. + # Any class inheriting from this one should provide a meaningful attribute + # if the sync status event is meant to be used somewhere else. + + uuid = 'undefined' + userid = 'undefined' + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + result = yield self._send_batch(body, docs_by_generation) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + @defer.inlineCallbacks + def _send_batch(self, body, docs): + total, calls = len(docs), [] + for i, entry in enumerate(docs): + calls.append((self._prepare_one_doc, + entry, body, i + 1, total)) + result = yield self._send_request(body, calls) + _emit_send_status(self.uuid, body.consumed, total) + + defer.returnValue(result) + + def _send_request(self, body, calls): + return self._http_request( + self._url, + method='POST', + body=(body, calls), + content_type='application/x-soledad-sync-put', + body_producer=DocStreamProducer) + + @defer.inlineCallbacks + def _prepare_one_doc(self, entry, body, idx, total): + get_doc_call, gen, trans_id = entry + doc, content = yield self._encrypt_doc(get_doc_call) + body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=total, + doc_idx=idx) + _emit_send_status(self.uuid, body.consumed, total) + + @defer.inlineCallbacks + def _encrypt_doc(self, get_doc_call): + f, args, kwargs = get_doc_call + doc = yield f(*args, **kwargs) + if doc.is_tombstone(): + defer.returnValue((doc, None)) + else: + content = yield self._crypto.encrypt_doc(doc) + defer.returnValue((doc, content)) + + +def _emit_send_status(user_data, idx, total): + content = {'sent': idx, 'total': total} + emit_async(SOLEDAD_SYNC_SEND_STATUS, user_data, content) + + msg = "%d/%d" % (idx, total) + logger.debug("Sync send status: %s" % msg) diff --git a/src/leap/soledad/client/http_target/send_protocol.py b/src/leap/soledad/client/http_target/send_protocol.py new file mode 100644 index 00000000..4941aa34 --- /dev/null +++ b/src/leap/soledad/client/http_target/send_protocol.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# send_protocol.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +from zope.interface import implementer +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web.iweb import IBodyProducer +from twisted.web.iweb import UNKNOWN_LENGTH + + +@implementer(IBodyProducer) +class DocStreamProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + def __init__(self, producer): + """ + Initialize the string produer. + + :param producer: A RequestBody instance and a list of producer calls + :type producer: (.support.RequestBody, [(function, *args)]) + """ + self.body, self.producer = producer + self.length = UNKNOWN_LENGTH + self.pause = False + self.stop = False + + @defer.inlineCallbacks + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A Deferred that fires when production ends. + :rtype: twisted.internet.defer.Deferred + """ + while self.producer and not self.stop: + if self.pause: + yield self.sleep(0.001) + continue + call = self.producer.pop(0) + fun, args = call[0], call[1:] + yield fun(*args) + consumer.write(self.body.pop(1, leave_open=True)) + consumer.write(self.body.pop(0)) # close stream + + def sleep(self, secs): + d = defer.Deferred() + reactor.callLater(secs, d.callback, None) + return d + + def pauseProducing(self): + self.pause = True + + def stopProducing(self): + self.stop = True + + def resumeProducing(self): + self.pause = False diff --git a/src/leap/soledad/client/http_target/support.py b/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..d8d8e420 --- /dev/null +++ b/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import warnings +import json + +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import http_errors + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + + +class ReadBodyProtocol(_ReadBodyProtocol): + """ + From original Twisted implementation, focused on adding our error + handling and ensuring that the proper u1db error is raised. + """ + + def __init__(self, response, deferred): + """ + Initialize the protocol, additionally storing the response headers. + """ + _ReadBodyProtocol.__init__( + self, response.code, response.phrase, deferred) + self.headers = response.headers + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + else: + self.deferred.errback( + errors.HTTPError(self.status, respdic, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + body = b''.join(self.dataBuffer) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(body) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(self.dataBuffer))) + else: + self.deferred.errback(reason) + + +def readBody(response, protocolClass=ReadBodyProtocol): + """ + Get the body of an L{IResponse} and return it as a byte string. + + This is a helper function for clients that don't want to incrementally + receive the body of an HTTP response. + + @param response: The HTTP response for which the body will be read. + @type response: L{IResponse} provider + + @return: A L{Deferred} which will fire with the body of the response. + Cancelling it will close the connection to the server immediately. + """ + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + d = defer.Deferred(cancel) + protocol = protocolClass(response, d) + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + response.deliverBody(protocol) + + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + + return d + + +class RequestBody(object): + """ + This class is a helper to generate send and fetch requests. + The expected format is something like: + [ + {headers}, + {entry1}, + {...}, + {entryN}, + ] + """ + + def __init__(self, **header_dict): + """ + Creates a new RequestBody holding header information. + + :param header_dict: A dictionary with the headers. + :type header_dict: dict + """ + self.headers = header_dict + self.entries = [] + self.consumed = 0 + + def insert_info(self, **entry_dict): + """ + Dumps an entry into JSON format and add it to entries list. + Adds 'content' key on a new line if it's present. + + :param entry_dict: Entry as a dictionary + :type entry_dict: dict + """ + content = '' + if 'content' in entry_dict: + content = ',\r\n' + (entry_dict['content'] or '') + entry = json.dumps(entry_dict) + content + self.entries.append(entry) + + def pop(self, amount=10, leave_open=False): + """ + Removes entries and returns it formatted and ready + to be sent. + + :param amount: number of entries to pop and format + :type amount: int + + :param leave_open: flag to skip stream closing + :type amount: bool + + :return: formatted body ready to be sent + :rtype: str + """ + start = self.consumed == 0 + amount = min([len(self.entries), amount]) + entries = [self.entries.pop(0) for i in xrange(amount)] + self.consumed += amount + end = len(self.entries) == 0 if not leave_open else False + return self.entries_to_str(entries, start, end) + + def __str__(self): + return self.pop(len(self.entries)) + + def __len__(self): + return len(self.entries) + + def entries_to_str(self, entries=None, start=True, end=True): + """ + Format a list of entries into the body format expected + by the server. + + :param entries: entries to format + :type entries: list + + :return: formatted body ready to be sent + :rtype: str + """ + data = '' + if start: + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + if end: + data += '\r\n]' + return data |