summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/http_target/fetch_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/http_target/fetch_protocol.py')
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
new file mode 100644
index 00000000..6ecba2b0
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Example using stdio, Deferreds, LineReceiver and twisted.web.client.
+
+Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
+a telnet server instead; see the comments for details.
+
+Based on an example by Abe Fettig.
+"""
+import sys
+import json
+import warnings
+from cStringIO import StringIO
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.internet import protocol
+from twisted.web.client import HTTPConnectionPool
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+from twisted.web.client import PartialDownloadError
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.l2db.remote import http_errors
+from leap.common.http import HTTPClient
+
+
+class DocStreamReceiver(protocol.Protocol):
+
+ def __init__(self, response, deferred, doc_reader):
+ self.deferred = deferred
+ self.status = response.code if response else None
+ self.message = response.phrase if response else None
+ self.headers = response.headers if response else {}
+ self.delimiter = '\r\n'
+ self._doc_reader = doc_reader
+ self.reset()
+
+ def reset(self):
+ self._line = 0
+ self._buffer = StringIO()
+ self._properly_finished = False
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ try:
+ body = self.finish()
+ except errors.BrokenSyncStream, e:
+ return self.deferred.errback(e)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(self.metadata)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(body)))
+ else:
+ self.deferred.errback(reason)
+
+ def consumeBufferLines(self):
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.seek(0)
+ lines = content.split(self.delimiter)
+ self._buffer.write(lines.pop(-1))
+ return lines
+
+ def dataReceived(self, data):
+ self._buffer.write(data)
+ if '\n' not in data:
+ return
+ lines = self.consumeBufferLines()
+ while lines:
+ line, _ = utils.check_and_strip_comma(lines.pop(0))
+ try:
+ self.lineReceived(line)
+ except AssertionError, e:
+ raise errors.BrokenSyncStream(e)
+
+ def lineReceived(self, line):
+ assert not self._properly_finished
+ if ']' == line:
+ self._properly_finished = True
+ elif self._line == 0:
+ assert line == '['
+ self._line += 1
+ elif self._line == 1:
+ self._line += 1
+ self.metadata = json.loads(line)
+ assert 'error' not in self.metadata
+ elif (self._line % 2) == 0:
+ self._line += 1
+ self.current_doc = json.loads(line)
+ assert 'error' not in self.current_doc
+ else:
+ self._line += 1
+ self._doc_reader(self.current_doc, line.strip() or None)
+
+ def finish(self):
+ if not self._properly_finished:
+ raise errors.BrokenSyncStream()
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.close()
+ return content
+
+
+def build_body_reader(doc_reader):
+ """
+ Get the documents from a sync stream and call doc_reader on each
+ doc received.
+
+ @param doc_reader: Function to be called for processing an incoming doc.
+ Will be called with doc metadata (dict parsed from 1st line) and doc
+ content (string)
+ @type response: function
+
+ @return: A L{Deferred} which will fire with the sync metadata.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def read(response):
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ d = defer.Deferred(cancel)
+ protocol = DocStreamReceiver(response, d, doc_reader)
+ response.deliverBody(protocol)
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+ return d
+ return read
+
+
+def read_doc(doc_info, content):
+ print doc_info, len(content)
+
+
+def finish(args):
+ print args
+ reactor.stop()
+
+
+def fetch(url, token, sync_id):
+ headers = {'Authorization': ['Token %s' % token]}
+ headers.update({'content-type': ['application/x-soledad-sync-get']})
+ body = """[
+{"ensure": false, "last_known_trans_id": "", "sync_id": "%s",
+"last_known_generation": 0},
+{"received": 0}
+]""" % sync_id
+ http = HTTPClient(pool=HTTPConnectionPool(reactor))
+ d = http.request(url, 'POST', body, headers, build_body_reader(read_doc))
+ d.addBoth(finish)
+
+
+if __name__ == "__main__":
+ assert len(sys.argv) == 4
+ reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3])
+ reactor.run()