summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/http_target/fetch_protocol.py
blob: 6ecba2b0a2d17bbb574a9317729211a022915feb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Example using stdio, Deferreds, LineReceiver and twisted.web.client.

Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
a telnet server instead; see the comments for details.

Based on an example by Abe Fettig.
"""
import sys
import json
import warnings
from cStringIO import StringIO
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import protocol
from twisted.web.client import HTTPConnectionPool
from twisted.web._newclient import ResponseDone
from twisted.web._newclient import PotentialDataLoss
from twisted.web.client import PartialDownloadError
from leap.soledad.common.l2db import errors
from leap.soledad.common.l2db.remote import utils
from leap.soledad.common.l2db.remote import http_errors
from leap.common.http import HTTPClient


class DocStreamReceiver(protocol.Protocol):

    def __init__(self, response, deferred, doc_reader):
        self.deferred = deferred
        self.status = response.code if response else None
        self.message = response.phrase if response else None
        self.headers = response.headers if response else {}
        self.delimiter = '\r\n'
        self._doc_reader = doc_reader
        self.reset()

    def reset(self):
        self._line = 0
        self._buffer = StringIO()
        self._properly_finished = False

    # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
    def _error(self, respdic):
        descr = respdic.get("error")
        exc_cls = errors.wire_description_to_exc.get(descr)
        if exc_cls is not None:
            message = respdic.get("message")
            self.deferred.errback(exc_cls(message))
    # ---8<--- end of snippet from u1db.remote.http_client

    def connectionLost(self, reason):
        """
        Deliver the accumulated response bytes to the waiting L{Deferred}, if
        the response body has been completely received without error.
        """
        if reason.check(ResponseDone):

            try:
                body = self.finish()
            except errors.BrokenSyncStream, e:
                return self.deferred.errback(e)

            # ---8<--- snippet from u1db.remote.http_client
            if self.status in (200, 201):
                self.deferred.callback(self.metadata)
            elif self.status in http_errors.ERROR_STATUSES:
                try:
                    respdic = json.loads(body)
                except ValueError:
                    self.deferred.errback(
                        errors.HTTPError(self.status, body, self.headers))
                else:
                    self._error(respdic)
            # special cases
            elif self.status == 503:
                self.deferred.errback(errors.Unavailable(body, self.headers))
            else:
                self.deferred.errback(
                    errors.HTTPError(self.status, body, self.headers))
            # ---8<--- end of snippet from u1db.remote.http_client

        elif reason.check(PotentialDataLoss):
            self.deferred.errback(
                PartialDownloadError(self.status, self.message,
                                     b''.join(body)))
        else:
            self.deferred.errback(reason)

    def consumeBufferLines(self):
        content = self._buffer.getvalue()[0:self._buffer.tell()]
        self._buffer.seek(0)
        lines = content.split(self.delimiter)
        self._buffer.write(lines.pop(-1))
        return lines

    def dataReceived(self, data):
        self._buffer.write(data)
        if '\n' not in data:
            return
        lines = self.consumeBufferLines()
        while lines:
            line, _ = utils.check_and_strip_comma(lines.pop(0))
            try:
                self.lineReceived(line)
            except AssertionError, e:
                raise errors.BrokenSyncStream(e)

    def lineReceived(self, line):
        assert not self._properly_finished
        if ']' == line:
            self._properly_finished = True
        elif self._line == 0:
            assert line == '['
            self._line += 1
        elif self._line == 1:
            self._line += 1
            self.metadata = json.loads(line)
            assert 'error' not in self.metadata
        elif (self._line % 2) == 0:
            self._line += 1
            self.current_doc = json.loads(line)
            assert 'error' not in self.current_doc
        else:
            self._line += 1
            self._doc_reader(self.current_doc, line.strip() or None)

    def finish(self):
        if not self._properly_finished:
            raise errors.BrokenSyncStream()
        content = self._buffer.getvalue()[0:self._buffer.tell()]
        self._buffer.close()
        return content


def build_body_reader(doc_reader):
    """
    Get the documents from a sync stream and call doc_reader on each
    doc received.

    @param doc_reader: Function to be called for processing an incoming doc.
        Will be called with doc metadata (dict parsed from 1st line) and doc
        content (string)
    @type response: function

    @return: A L{Deferred} which will fire with the sync metadata.
        Cancelling it will close the connection to the server immediately.
    """
    def read(response):
        def cancel(deferred):
            """
            Cancel a L{readBody} call, close the connection to the HTTP server
            immediately, if it is still open.

            @param deferred: The cancelled L{defer.Deferred}.
            """
            abort = getAbort()
            if abort is not None:
                abort()

        def getAbort():
            return getattr(protocol.transport, 'abortConnection', None)

        d = defer.Deferred(cancel)
        protocol = DocStreamReceiver(response, d, doc_reader)
        response.deliverBody(protocol)
        if protocol.transport is not None and getAbort() is None:
            warnings.warn(
                'Using readBody with a transport that does not have an '
                'abortConnection method',
                category=DeprecationWarning,
                stacklevel=2)
        return d
    return read


def read_doc(doc_info, content):
    print doc_info, len(content)


def finish(args):
    print args
    reactor.stop()


def fetch(url, token, sync_id):
    headers = {'Authorization': ['Token %s' % token]}
    headers.update({'content-type': ['application/x-soledad-sync-get']})
    body = """[
{"ensure": false, "last_known_trans_id": "", "sync_id": "%s",
"last_known_generation": 0},
{"received": 0}
]""" % sync_id
    http = HTTPClient(pool=HTTPConnectionPool(reactor))
    d = http.request(url, 'POST', body, headers, build_body_reader(read_doc))
    d.addBoth(finish)


if __name__ == "__main__":
    assert len(sys.argv) == 4
    reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3])
    reactor.run()