summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-10-01 01:16:40 -0300
committerVictor Shyba <victor1984@riseup.net>2016-11-18 15:55:52 -0300
commit91a5c5a29b58d0782c8c4773fee69e0172bb9388 (patch)
tree25652ed5a13d8437b145cf738313f39522ab35d2
parent0c5c094c763ff5a503dc147d5efc696e14ba6776 (diff)
[refactor] more comments, less code
Some code were duplicated, got removed. Additional comments added for documenting such a critical and complex part as a protocol.
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py132
-rw-r--r--client/src/leap/soledad/client/http_target/send_protocol.py2
-rw-r--r--client/src/leap/soledad/client/http_target/support.py4
3 files changed, 59 insertions, 79 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
index 902607ea..dac82d8e 100644
--- a/client/src/leap/soledad/client/http_target/fetch_protocol.py
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -1,31 +1,44 @@
-#!/usr/bin/env python
-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Example using stdio, Deferreds, LineReceiver and twisted.web.client.
-
-Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
-a telnet server instead; see the comments for details.
-
-Based on an example by Abe Fettig.
-"""
-import sys
+# -*- coding: utf-8 -*-
+# support.py
+# Copyright (C) 2016 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
-import warnings
+from functools import partial
from cStringIO import StringIO
-from twisted.internet import reactor
-from twisted.internet import defer
-from twisted.web.client import HTTPConnectionPool
from twisted.web._newclient import ResponseDone
from leap.soledad.common.l2db import errors
from leap.soledad.common.l2db.remote import utils
-from leap.common.http import HTTPClient
from .support import ReadBodyProtocol
+from .support import readBody
class DocStreamReceiver(ReadBodyProtocol):
+ """
+ A protocol implementation that can parse incoming data from server based
+ on a line format specified on u1db implementation. Except that we split doc
+ attributes from content to ease parsing and increment throughput for larger
+ documents.
+ [\r\n
+ {metadata},\r\n
+ {doc_info},\r\n
+ {content},\r\n
+ ...
+ {doc_info},\r\n
+ {content},\r\n
+ ]
+ """
def __init__(self, response, deferred, doc_reader):
self.deferred = deferred
@@ -56,6 +69,10 @@ class DocStreamReceiver(ReadBodyProtocol):
return ReadBodyProtocol.connectionLost(self, reason)
def consumeBufferLines(self):
+ """
+ Consumes lines from buffer and rewind it, writing remaining data
+ that didn't formed a line back into buffer.
+ """
content = self._buffer.getvalue()[0:self._buffer.tell()]
self._buffer.seek(0)
lines = content.split(self.delimiter)
@@ -63,6 +80,10 @@ class DocStreamReceiver(ReadBodyProtocol):
return lines
def dataReceived(self, data):
+ """
+ Buffer incoming data until a line breaks comes in. We check only
+ the incoming data for efficiency.
+ """
self._buffer.write(data)
if '\n' not in data:
return
@@ -76,6 +97,14 @@ class DocStreamReceiver(ReadBodyProtocol):
raise errors.BrokenSyncStream(e)
def lineReceived(self, line):
+ """
+ Protocol implementation.
+ 0: [\r\n
+ 1: {metadata},\r\n
+ (even): {doc_info},\r\n
+ (odd): {data},\r\n
+ (last): ]
+ """
assert not self._properly_finished
if ']' == line:
self._properly_finished = True
@@ -91,6 +120,9 @@ class DocStreamReceiver(ReadBodyProtocol):
self._doc_reader(self.current_doc, line.strip() or None)
def finish(self):
+ """
+ Checks that ']' came and stream was properly closed.
+ """
if not self._properly_finished:
raise errors.BrokenSyncStream()
content = self._buffer.getvalue()[0:self._buffer.tell()]
@@ -106,62 +138,10 @@ def build_body_reader(doc_reader):
@param doc_reader: Function to be called for processing an incoming doc.
Will be called with doc metadata (dict parsed from 1st line) and doc
content (string)
- @type response: function
+ @type doc_reader: function
- @return: A L{Deferred} which will fire with the sync metadata.
- Cancelling it will close the connection to the server immediately.
+ @return: A function that can be called by the http Agent to create and
+ configure the proper protocol.
"""
- def read(response):
- def cancel(deferred):
- """
- Cancel a L{readBody} call, close the connection to the HTTP server
- immediately, if it is still open.
-
- @param deferred: The cancelled L{defer.Deferred}.
- """
- abort = getAbort()
- if abort is not None:
- abort()
-
- def getAbort():
- return getattr(protocol.transport, 'abortConnection', None)
-
- d = defer.Deferred(cancel)
- protocol = DocStreamReceiver(response, d, doc_reader)
- response.deliverBody(protocol)
- if protocol.transport is not None and getAbort() is None:
- warnings.warn(
- 'Using readBody with a transport that does not have an '
- 'abortConnection method',
- category=DeprecationWarning,
- stacklevel=2)
- return d
- return read
-
-
-def read_doc(doc_info, content):
- print doc_info, len(content)
-
-
-def finish(args):
- print args
- reactor.stop()
-
-
-def fetch(url, token, sync_id):
- headers = {'Authorization': ['Token %s' % token]}
- headers.update({'content-type': ['application/x-soledad-sync-get']})
- body = """[
-{"ensure": false, "last_known_trans_id": "", "sync_id": "%s",
-"last_known_generation": 0},
-{"received": 0}
-]""" % sync_id
- http = HTTPClient(pool=HTTPConnectionPool(reactor))
- d = http.request(url, 'POST', body, headers, build_body_reader(read_doc))
- d.addBoth(finish)
-
-
-if __name__ == "__main__":
- assert len(sys.argv) == 4
- reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3])
- reactor.run()
+ protocolClass = partial(DocStreamReceiver, doc_reader=doc_reader)
+ return partial(readBody, protocolClass=protocolClass)
diff --git a/client/src/leap/soledad/client/http_target/send_protocol.py b/client/src/leap/soledad/client/http_target/send_protocol.py
index b93a4284..61e95e56 100644
--- a/client/src/leap/soledad/client/http_target/send_protocol.py
+++ b/client/src/leap/soledad/client/http_target/send_protocol.py
@@ -32,7 +32,7 @@ class DocStreamProducer(object):
:param consumer: Any IConsumer provider.
:type consumer: twisted.internet.interfaces.IConsumer
- :return: A successful deferred.
+ :return: A Deferred that fires when production ends.
:rtype: twisted.internet.defer.Deferred
"""
call = self.producer.pop(0)
diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py
index c066331c..feb306e8 100644
--- a/client/src/leap/soledad/client/http_target/support.py
+++ b/client/src/leap/soledad/client/http_target/support.py
@@ -94,7 +94,7 @@ class ReadBodyProtocol(_ReadBodyProtocol):
self.deferred.errback(reason)
-def readBody(response):
+def readBody(response, protocolClass=ReadBodyProtocol):
"""
Get the body of an L{IResponse} and return it as a byte string.
@@ -119,7 +119,7 @@ def readBody(response):
abort()
d = defer.Deferred(cancel)
- protocol = ReadBodyProtocol(response, d)
+ protocol = protocolClass(response, d)
def getAbort():
return getattr(protocol.transport, 'abortConnection', None)