From 93a8be4a374a4863a36c99e5cca5eed8e6568d15 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Sep 2016 03:56:44 -0300 Subject: [feature] streaming download protocol This commit finishes reversion into u1db original streaming protocol for downloads. --- client/src/leap/soledad/client/http_target/api.py | 4 +- .../src/leap/soledad/client/http_target/fetch.py | 91 ++++----- .../soledad/client/http_target/fetch_protocol.py | 206 +++++++++++++++++++++ server/src/leap/soledad/server/sync.py | 6 +- testing/tests/sync/test_sync_target.py | 65 +++---- 5 files changed, 273 insertions(+), 99 deletions(-) create mode 100644 client/src/leap/soledad/client/http_target/fetch_protocol.py diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index c9da939c..4e068523 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -72,11 +72,11 @@ class SyncTargetAPI(SyncTarget): return self._sync_enc_pool is not None def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None): + content_type=None, body_reader=readBody): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, readBody) + d = self._http.request(url, method, body, headers, body_reader) d.addErrback(_unauth_to_invalid_token_error) return d diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 1f1bc480..063082e5 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import json - from twisted.internet import defer from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -25,7 +23,9 @@ from leap.soledad.client.http_target.support import RequestBody from leap.soledad.common.log import getLogger from leap.soledad.common.document import SoledadDocument from leap.soledad.common.l2db import errors -from leap.soledad.common.l2db.remote import utils +from datetime import datetime + +from . import fetch_protocol logger = getLogger(__name__) @@ -58,12 +58,12 @@ class HTTPDocFetcher(object): # to know the total number of documents to be received, and this # information comes as metadata to each request. - docs = yield self._fetch_all( - last_known_generation, last_known_trans_id, - sync_id, 0) self._received_docs = 0 + metadata = yield self._fetch_all( + last_known_generation, last_known_trans_id, + sync_id, self._received_docs) number_of_changes, ngen, ntrans =\ - self._insert_received_docs(docs, 1, 1) + self._parse_metadata(metadata) if ngen: new_generation = ngen @@ -81,14 +81,17 @@ class HTTPDocFetcher(object): ensure=self._ensure_callback is not None) # inform server of how many documents have already been received body.insert_info(received=received) - # send headers + # build a stream reader with doc parser callback + body_reader = fetch_protocol.build_body_reader(self._doc_parser) + # start download stream return self._http_request( self._url, method='POST', body=str(body), - content_type='application/x-soledad-sync-get') + content_type='application/x-soledad-sync-get', + body_reader=body_reader) - def _insert_received_docs(self, response, idx, total): + def _doc_parser(self, doc_info, content): """ Insert a received document into the local replica. @@ -99,26 +102,20 @@ class HTTPDocFetcher(object): :param total: The total number of operations. :type total: int """ - new_generation, new_transaction_id, number_of_changes, entries =\ - self._parse_received_doc_response(response) - - for doc_id, rev, content, gen, trans_id in entries: - if doc_id is not None: - # decrypt incoming document and insert into local database - # --------------------------------------------------------- - # symmetric decryption of document's contents - # --------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - self._received_docs += 1 - user_data = {'uuid': self.uuid, 'userid': self.userid} - _emit_receive_status(user_data, self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id - - def _parse_received_doc_response(self, response): + # decrypt incoming document and insert into local database + # --------------------------------------------------------- + # symmetric decryption of document's contents + # --------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt + doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) + if is_symmetrically_encrypted(doc): + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id']) + self._received_docs += 1 + user_data = {'uuid': self.uuid, 'userid': self.userid} + _emit_receive_status(user_data, self._received_docs, total=1000000) + + def _parse_metadata(self, metadata): """ Parse the response from the server containing the received document. @@ -130,18 +127,18 @@ class HTTPDocFetcher(object): :rtype: tuple """ # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] + # parts = response.splitlines() + # if not parts or parts[0] != '[' or parts[-1] != ']': + # raise errors.BrokenSyncStream + # data = parts[1:-1] # decode metadata + # try: + # line, comma = utils.check_and_strip_comma(data[0]) + # metadata = None + # except (IndexError): + # raise errors.BrokenSyncStream try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream - try: - metadata = json.loads(line) + # metadata = json.loads(line) new_generation = metadata['new_generation'] new_transaction_id = metadata['new_transaction_id'] number_of_changes = metadata['number_of_changes'] @@ -150,19 +147,7 @@ class HTTPDocFetcher(object): # make sure we have replica_uid from fresh new dbs if self._ensure_callback and 'replica_uid' in metadata: self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - entries = [] - for index in xrange(1, len(data[1:]), 2): - try: - line, comma = utils.check_and_strip_comma(data[index]) - content, _ = utils.check_and_strip_comma(data[index + 1]) - entry = json.loads(line) - entries.append((entry['id'], entry['rev'], content or None, - entry['gen'], entry['trans_id'])) - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - entries + return number_of_changes, new_generation, new_transaction_id def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..6ecba2b0 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python + +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Example using stdio, Deferreds, LineReceiver and twisted.web.client. + +Note that the WebCheckerCommandProtocol protocol could easily be used in e.g. +a telnet server instead; see the comments for details. + +Based on an example by Abe Fettig. +""" +import sys +import json +import warnings +from cStringIO import StringIO +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet import protocol +from twisted.web.client import HTTPConnectionPool +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss +from twisted.web.client import PartialDownloadError +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.l2db.remote import http_errors +from leap.common.http import HTTPClient + + +class DocStreamReceiver(protocol.Protocol): + + def __init__(self, response, deferred, doc_reader): + self.deferred = deferred + self.status = response.code if response else None + self.message = response.phrase if response else None + self.headers = response.headers if response else {} + self.delimiter = '\r\n' + self._doc_reader = doc_reader + self.reset() + + def reset(self): + self._line = 0 + self._buffer = StringIO() + self._properly_finished = False + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + try: + body = self.finish() + except errors.BrokenSyncStream, e: + return self.deferred.errback(e) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(self.metadata) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(body))) + else: + self.deferred.errback(reason) + + def consumeBufferLines(self): + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.seek(0) + lines = content.split(self.delimiter) + self._buffer.write(lines.pop(-1)) + return lines + + def dataReceived(self, data): + self._buffer.write(data) + if '\n' not in data: + return + lines = self.consumeBufferLines() + while lines: + line, _ = utils.check_and_strip_comma(lines.pop(0)) + try: + self.lineReceived(line) + except AssertionError, e: + raise errors.BrokenSyncStream(e) + + def lineReceived(self, line): + assert not self._properly_finished + if ']' == line: + self._properly_finished = True + elif self._line == 0: + assert line == '[' + self._line += 1 + elif self._line == 1: + self._line += 1 + self.metadata = json.loads(line) + assert 'error' not in self.metadata + elif (self._line % 2) == 0: + self._line += 1 + self.current_doc = json.loads(line) + assert 'error' not in self.current_doc + else: + self._line += 1 + self._doc_reader(self.current_doc, line.strip() or None) + + def finish(self): + if not self._properly_finished: + raise errors.BrokenSyncStream() + content = self._buffer.getvalue()[0:self._buffer.tell()] + self._buffer.close() + return content + + +def build_body_reader(doc_reader): + """ + Get the documents from a sync stream and call doc_reader on each + doc received. + + @param doc_reader: Function to be called for processing an incoming doc. + Will be called with doc metadata (dict parsed from 1st line) and doc + content (string) + @type response: function + + @return: A L{Deferred} which will fire with the sync metadata. + Cancelling it will close the connection to the server immediately. + """ + def read(response): + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + d = defer.Deferred(cancel) + protocol = DocStreamReceiver(response, d, doc_reader) + response.deliverBody(protocol) + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + return d + return read + + +def read_doc(doc_info, content): + print doc_info, len(content) + + +def finish(args): + print args + reactor.stop() + + +def fetch(url, token, sync_id): + headers = {'Authorization': ['Token %s' % token]} + headers.update({'content-type': ['application/x-soledad-sync-get']}) + body = """[ +{"ensure": false, "last_known_trans_id": "", "sync_id": "%s", +"last_known_generation": 0}, +{"received": 0} +]""" % sync_id + http = HTTPClient(pool=HTTPConnectionPool(reactor)) + d = http.request(url, 'POST', body, headers, build_body_reader(read_doc)) + d.addBoth(finish) + + +if __name__ == "__main__": + assert len(sys.argv) == 4 + reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3]) + reactor.run() diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 6f2ffe9f..c958bfaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -254,7 +254,11 @@ class SyncResource(http_app.SyncResource): gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) content = doc.get_json() - self.responder.stream_entry(content.read() if content else '') + if content: + self.responder.stream_entry(content.read()) + content.close() + else: + self.responder.stream_entry('') new_gen, number_of_changes = \ self.sync_exch.find_changes_to_return(received) diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index a2935539..997dcdcd 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -30,6 +30,7 @@ from testscenarios import TestWithScenarios from twisted.internet import defer from leap.soledad.client import http_target as target +from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver from leap.soledad.client import crypto from leap.soledad.client.sqlcipher import SQLCipherU1DBSync from leap.soledad.client.sqlcipher import SQLCipherOptions @@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app from test_soledad.util import make_token_soledad_app from test_soledad.util import make_soledad_document_for_test from test_soledad.util import soledad_sync_target +from twisted.trial import unittest from test_soledad.util import SoledadWithCouchServerMixin from test_soledad.util import ADDRESS from test_soledad.util import SQLCIPHER_SCENARIOS @@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS # The following tests come from `u1db.tests.test_remote_sync_target`. # ----------------------------------------------------------------------------- -class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin): +class TestSoledadParseReceivedDocResponse(unittest.TestCase): """ Some tests had to be copied to this class so we can instantiate our own target. """ - def setUp(self): - SoledadWithCouchServerMixin.setUp(self) - creds = {'token': { - 'uuid': 'user-uuid', - 'token': 'auth-token', - }} - self.target = target.SoledadHTTPSyncTarget( - self.couch_url, - uuid4().hex, - creds, - self._soledad._crypto, - None) - - def tearDown(self): - self.target.close() - SoledadWithCouchServerMixin.tearDown(self) + def parse(self, stream): + parser = DocStreamReceiver(None, None, lambda x, y: 42) + parser.dataReceived(stream) + parser.finish() def test_extra_comma(self): - """ - Test adapted to use encrypted content. - """ doc = SoledadDocument('i', rev='r') - doc.content = {} - _crypto = self._soledad._crypto - key = _crypto.doc_passphrase(doc.doc_id) - secret = _crypto.secret + doc.content = {'a': 'b'} - enc_json = crypto.encrypt_docstr( - doc.get_json(), doc.doc_id, doc.rev, - key, secret) + encrypted_docstr = crypto.SoledadCrypto('').encrypt_doc(doc) with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n{},\r\n]") + self.parse("[\r\n{},\r\n]") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( ('[\r\n{},\r\n{"id": "i", "rev": "r", ' + - '"content": %s, "gen": 3, "trans_id": "T-sid"}' + - ',\r\n]') % json.dumps(enc_json)) + '"gen": 3, "trans_id": "T-sid"},\r\n' + + '%s,\r\n]') % encrypted_docstr) def test_wrong_start(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("{}\r\n]") - - with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("\r\n{}\r\n]") + self.parse("{}\r\n]") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("") + self.parse("\r\n{}\r\n]") def test_wrong_end(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n{}") + self.parse("[\r\n{}") with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n") + self.parse("[\r\n") def test_missing_comma(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{}\r\n{"id": "i", "rev": "r", ' '"content": "c", "gen": 3}\r\n]') def test_no_entries(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response("[\r\n]") + self.parse("[\r\n]") def test_error_in_stream(self): with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{"new_generation": 0},' '\r\n{"error": "unavailable"}\r\n') with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response( + self.parse( '[\r\n{"error": "unavailable"}\r\n') with self.assertRaises(l2db.errors.BrokenSyncStream): - self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n') + self.parse('[\r\n{"error": "?"}\r\n') # # functions for TestRemoteSyncTargets -- cgit v1.2.3