From af0189dda80f9a5cb720841378e635fc06881edf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 1 Oct 2016 01:16:40 -0300 Subject: [refactor] more comments, less code Some code were duplicated, got removed. Additional comments added for documenting such a critical and complex part as a protocol. --- .../soledad/client/http_target/fetch_protocol.py | 132 +++++++++------------ .../soledad/client/http_target/send_protocol.py | 2 +- .../src/leap/soledad/client/http_target/support.py | 4 +- 3 files changed, 59 insertions(+), 79 deletions(-) diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index 902607ea..dac82d8e 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -1,31 +1,44 @@ -#!/usr/bin/env python - -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Example using stdio, Deferreds, LineReceiver and twisted.web.client. - -Note that the WebCheckerCommandProtocol protocol could easily be used in e.g. -a telnet server instead; see the comments for details. - -Based on an example by Abe Fettig. -""" -import sys +# -*- coding: utf-8 -*- +# support.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import json -import warnings +from functools import partial from cStringIO import StringIO -from twisted.internet import reactor -from twisted.internet import defer -from twisted.web.client import HTTPConnectionPool from twisted.web._newclient import ResponseDone from leap.soledad.common.l2db import errors from leap.soledad.common.l2db.remote import utils -from leap.common.http import HTTPClient from .support import ReadBodyProtocol +from .support import readBody class DocStreamReceiver(ReadBodyProtocol): + """ + A protocol implementation that can parse incoming data from server based + on a line format specified on u1db implementation. Except that we split doc + attributes from content to ease parsing and increment throughput for larger + documents. + [\r\n + {metadata},\r\n + {doc_info},\r\n + {content},\r\n + ... + {doc_info},\r\n + {content},\r\n + ] + """ def __init__(self, response, deferred, doc_reader): self.deferred = deferred @@ -56,6 +69,10 @@ class DocStreamReceiver(ReadBodyProtocol): return ReadBodyProtocol.connectionLost(self, reason) def consumeBufferLines(self): + """ + Consumes lines from buffer and rewind it, writing remaining data + that didn't formed a line back into buffer. + """ content = self._buffer.getvalue()[0:self._buffer.tell()] self._buffer.seek(0) lines = content.split(self.delimiter) @@ -63,6 +80,10 @@ class DocStreamReceiver(ReadBodyProtocol): return lines def dataReceived(self, data): + """ + Buffer incoming data until a line breaks comes in. We check only + the incoming data for efficiency. + """ self._buffer.write(data) if '\n' not in data: return @@ -76,6 +97,14 @@ class DocStreamReceiver(ReadBodyProtocol): raise errors.BrokenSyncStream(e) def lineReceived(self, line): + """ + Protocol implementation. + 0: [\r\n + 1: {metadata},\r\n + (even): {doc_info},\r\n + (odd): {data},\r\n + (last): ] + """ assert not self._properly_finished if ']' == line: self._properly_finished = True @@ -91,6 +120,9 @@ class DocStreamReceiver(ReadBodyProtocol): self._doc_reader(self.current_doc, line.strip() or None) def finish(self): + """ + Checks that ']' came and stream was properly closed. + """ if not self._properly_finished: raise errors.BrokenSyncStream() content = self._buffer.getvalue()[0:self._buffer.tell()] @@ -106,62 +138,10 @@ def build_body_reader(doc_reader): @param doc_reader: Function to be called for processing an incoming doc. Will be called with doc metadata (dict parsed from 1st line) and doc content (string) - @type response: function + @type doc_reader: function - @return: A L{Deferred} which will fire with the sync metadata. - Cancelling it will close the connection to the server immediately. + @return: A function that can be called by the http Agent to create and + configure the proper protocol. """ - def read(response): - def cancel(deferred): - """ - Cancel a L{readBody} call, close the connection to the HTTP server - immediately, if it is still open. - - @param deferred: The cancelled L{defer.Deferred}. - """ - abort = getAbort() - if abort is not None: - abort() - - def getAbort(): - return getattr(protocol.transport, 'abortConnection', None) - - d = defer.Deferred(cancel) - protocol = DocStreamReceiver(response, d, doc_reader) - response.deliverBody(protocol) - if protocol.transport is not None and getAbort() is None: - warnings.warn( - 'Using readBody with a transport that does not have an ' - 'abortConnection method', - category=DeprecationWarning, - stacklevel=2) - return d - return read - - -def read_doc(doc_info, content): - print doc_info, len(content) - - -def finish(args): - print args - reactor.stop() - - -def fetch(url, token, sync_id): - headers = {'Authorization': ['Token %s' % token]} - headers.update({'content-type': ['application/x-soledad-sync-get']}) - body = """[ -{"ensure": false, "last_known_trans_id": "", "sync_id": "%s", -"last_known_generation": 0}, -{"received": 0} -]""" % sync_id - http = HTTPClient(pool=HTTPConnectionPool(reactor)) - d = http.request(url, 'POST', body, headers, build_body_reader(read_doc)) - d.addBoth(finish) - - -if __name__ == "__main__": - assert len(sys.argv) == 4 - reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3]) - reactor.run() + protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader) + return partial(readBody, protocolClass=protocolClass) diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py index b93a4284..61e95e56 100644 --- a/client/src/leap/soledad/client/http_target/send_protocol.py +++ b/client/src/leap/soledad/client/http_target/send_protocol.py @@ -32,7 +32,7 @@ class DocStreamProducer(object): :param consumer: Any IConsumer provider. :type consumer: twisted.internet.interfaces.IConsumer - :return: A successful deferred. + :return: A Deferred that fires when production ends. :rtype: twisted.internet.defer.Deferred """ call = self.producer.pop(0) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index c066331c..feb306e8 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -94,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol): self.deferred.errback(reason) -def readBody(response): +def readBody(response, protocolClass=ReadBodyProtocol): """ Get the body of an L{IResponse} and return it as a byte string. @@ -119,7 +119,7 @@ def readBody(response): abort() d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) + protocol = protocolClass(response, d) def getAbort(): return getattr(protocol.transport, 'abortConnection', None) -- cgit v1.2.3