summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Shyba <victor.shyba@gmail.com>2016-09-19 03:56:44 -0300
committerdrebs <drebs@leap.se>2016-12-12 09:11:58 -0200
commit07dcb2ae5240f20a26903f53a432fcd49c7f1ec9 (patch)
treef33354df809a5fbdd1771a1f07787e9e3b58af93
parent35563cb74fcfd7f6ae969ed3af3a74d3c18cbf5b (diff)
[feature] streaming download protocol
This commit finishes reversion into u1db original streaming protocol for downloads.
-rw-r--r--client/src/leap/soledad/client/http_target/api.py4
-rw-r--r--client/src/leap/soledad/client/http_target/fetch.py91
-rw-r--r--client/src/leap/soledad/client/http_target/fetch_protocol.py206
-rw-r--r--server/src/leap/soledad/server/sync.py6
-rw-r--r--testing/tests/sync/test_sync_target.py65
5 files changed, 273 insertions, 99 deletions
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py
index c9da939c..4e068523 100644
--- a/client/src/leap/soledad/client/http_target/api.py
+++ b/client/src/leap/soledad/client/http_target/api.py
@@ -72,11 +72,11 @@ class SyncTargetAPI(SyncTarget):
return self._sync_enc_pool is not None
def _http_request(self, url, method='GET', body=None, headers=None,
- content_type=None):
+ content_type=None, body_reader=readBody):
headers = headers or self._base_header
if content_type:
headers.update({'content-type': [content_type]})
- d = self._http.request(url, method, body, headers, readBody)
+ d = self._http.request(url, method, body, headers, body_reader)
d.addErrback(_unauth_to_invalid_token_error)
return d
diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py
index 1f1bc480..063082e5 100644
--- a/client/src/leap/soledad/client/http_target/fetch.py
+++ b/client/src/leap/soledad/client/http_target/fetch.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-
from twisted.internet import defer
from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
@@ -25,7 +23,9 @@ from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
from leap.soledad.common.document import SoledadDocument
from leap.soledad.common.l2db import errors
-from leap.soledad.common.l2db.remote import utils
+from datetime import datetime
+
+from . import fetch_protocol
logger = getLogger(__name__)
@@ -58,12 +58,12 @@ class HTTPDocFetcher(object):
# to know the total number of documents to be received, and this
# information comes as metadata to each request.
- docs = yield self._fetch_all(
- last_known_generation, last_known_trans_id,
- sync_id, 0)
self._received_docs = 0
+ metadata = yield self._fetch_all(
+ last_known_generation, last_known_trans_id,
+ sync_id, self._received_docs)
number_of_changes, ngen, ntrans =\
- self._insert_received_docs(docs, 1, 1)
+ self._parse_metadata(metadata)
if ngen:
new_generation = ngen
@@ -81,14 +81,17 @@ class HTTPDocFetcher(object):
ensure=self._ensure_callback is not None)
# inform server of how many documents have already been received
body.insert_info(received=received)
- # send headers
+ # build a stream reader with doc parser callback
+ body_reader = fetch_protocol.build_body_reader(self._doc_parser)
+ # start download stream
return self._http_request(
self._url,
method='POST',
body=str(body),
- content_type='application/x-soledad-sync-get')
+ content_type='application/x-soledad-sync-get',
+ body_reader=body_reader)
- def _insert_received_docs(self, response, idx, total):
+ def _doc_parser(self, doc_info, content):
"""
Insert a received document into the local replica.
@@ -99,26 +102,20 @@ class HTTPDocFetcher(object):
:param total: The total number of operations.
:type total: int
"""
- new_generation, new_transaction_id, number_of_changes, entries =\
- self._parse_received_doc_response(response)
-
- for doc_id, rev, content, gen, trans_id in entries:
- if doc_id is not None:
- # decrypt incoming document and insert into local database
- # ---------------------------------------------------------
- # symmetric decryption of document's contents
- # ---------------------------------------------------------
- # If arriving content was symmetrically encrypted, we decrypt
- doc = SoledadDocument(doc_id, rev, content)
- if is_symmetrically_encrypted(doc):
- doc.set_json(self._crypto.decrypt_doc(doc))
- self._insert_doc_cb(doc, gen, trans_id)
- self._received_docs += 1
- user_data = {'uuid': self.uuid, 'userid': self.userid}
- _emit_receive_status(user_data, self._received_docs, total)
- return number_of_changes, new_generation, new_transaction_id
-
- def _parse_received_doc_response(self, response):
+ # decrypt incoming document and insert into local database
+ # ---------------------------------------------------------
+ # symmetric decryption of document's contents
+ # ---------------------------------------------------------
+ # If arriving content was symmetrically encrypted, we decrypt
+ doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)
+ if is_symmetrically_encrypted(doc):
+ doc.set_json(self._crypto.decrypt_doc(doc))
+ self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id'])
+ self._received_docs += 1
+ user_data = {'uuid': self.uuid, 'userid': self.userid}
+ _emit_receive_status(user_data, self._received_docs, total=1000000)
+
+ def _parse_metadata(self, metadata):
"""
Parse the response from the server containing the received document.
@@ -130,18 +127,18 @@ class HTTPDocFetcher(object):
:rtype: tuple
"""
# decode incoming stream
- parts = response.splitlines()
- if not parts or parts[0] != '[' or parts[-1] != ']':
- raise errors.BrokenSyncStream
- data = parts[1:-1]
+ # parts = response.splitlines()
+ # if not parts or parts[0] != '[' or parts[-1] != ']':
+ # raise errors.BrokenSyncStream
+ # data = parts[1:-1]
# decode metadata
+ # try:
+ # line, comma = utils.check_and_strip_comma(data[0])
+ # metadata = None
+ # except (IndexError):
+ # raise errors.BrokenSyncStream
try:
- line, comma = utils.check_and_strip_comma(data[0])
- metadata = None
- except (IndexError):
- raise errors.BrokenSyncStream
- try:
- metadata = json.loads(line)
+ # metadata = json.loads(line)
new_generation = metadata['new_generation']
new_transaction_id = metadata['new_transaction_id']
number_of_changes = metadata['number_of_changes']
@@ -150,19 +147,7 @@ class HTTPDocFetcher(object):
# make sure we have replica_uid from fresh new dbs
if self._ensure_callback and 'replica_uid' in metadata:
self._ensure_callback(metadata['replica_uid'])
- # parse incoming document info
- entries = []
- for index in xrange(1, len(data[1:]), 2):
- try:
- line, comma = utils.check_and_strip_comma(data[index])
- content, _ = utils.check_and_strip_comma(data[index + 1])
- entry = json.loads(line)
- entries.append((entry['id'], entry['rev'], content or None,
- entry['gen'], entry['trans_id']))
- except (IndexError, KeyError):
- raise errors.BrokenSyncStream
- return new_generation, new_transaction_id, number_of_changes, \
- entries
+ return number_of_changes, new_generation, new_transaction_id
def _emit_receive_status(user_data, received_docs, total):
diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py
new file mode 100644
index 00000000..6ecba2b0
--- /dev/null
+++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Example using stdio, Deferreds, LineReceiver and twisted.web.client.
+
+Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
+a telnet server instead; see the comments for details.
+
+Based on an example by Abe Fettig.
+"""
+import sys
+import json
+import warnings
+from cStringIO import StringIO
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.internet import protocol
+from twisted.web.client import HTTPConnectionPool
+from twisted.web._newclient import ResponseDone
+from twisted.web._newclient import PotentialDataLoss
+from twisted.web.client import PartialDownloadError
+from leap.soledad.common.l2db import errors
+from leap.soledad.common.l2db.remote import utils
+from leap.soledad.common.l2db.remote import http_errors
+from leap.common.http import HTTPClient
+
+
+class DocStreamReceiver(protocol.Protocol):
+
+ def __init__(self, response, deferred, doc_reader):
+ self.deferred = deferred
+ self.status = response.code if response else None
+ self.message = response.phrase if response else None
+ self.headers = response.headers if response else {}
+ self.delimiter = '\r\n'
+ self._doc_reader = doc_reader
+ self.reset()
+
+ def reset(self):
+ self._line = 0
+ self._buffer = StringIO()
+ self._properly_finished = False
+
+ # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks
+ def _error(self, respdic):
+ descr = respdic.get("error")
+ exc_cls = errors.wire_description_to_exc.get(descr)
+ if exc_cls is not None:
+ message = respdic.get("message")
+ self.deferred.errback(exc_cls(message))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ def connectionLost(self, reason):
+ """
+ Deliver the accumulated response bytes to the waiting L{Deferred}, if
+ the response body has been completely received without error.
+ """
+ if reason.check(ResponseDone):
+
+ try:
+ body = self.finish()
+ except errors.BrokenSyncStream, e:
+ return self.deferred.errback(e)
+
+ # ---8<--- snippet from u1db.remote.http_client
+ if self.status in (200, 201):
+ self.deferred.callback(self.metadata)
+ elif self.status in http_errors.ERROR_STATUSES:
+ try:
+ respdic = json.loads(body)
+ except ValueError:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ else:
+ self._error(respdic)
+ # special cases
+ elif self.status == 503:
+ self.deferred.errback(errors.Unavailable(body, self.headers))
+ else:
+ self.deferred.errback(
+ errors.HTTPError(self.status, body, self.headers))
+ # ---8<--- end of snippet from u1db.remote.http_client
+
+ elif reason.check(PotentialDataLoss):
+ self.deferred.errback(
+ PartialDownloadError(self.status, self.message,
+ b''.join(body)))
+ else:
+ self.deferred.errback(reason)
+
+ def consumeBufferLines(self):
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.seek(0)
+ lines = content.split(self.delimiter)
+ self._buffer.write(lines.pop(-1))
+ return lines
+
+ def dataReceived(self, data):
+ self._buffer.write(data)
+ if '\n' not in data:
+ return
+ lines = self.consumeBufferLines()
+ while lines:
+ line, _ = utils.check_and_strip_comma(lines.pop(0))
+ try:
+ self.lineReceived(line)
+ except AssertionError, e:
+ raise errors.BrokenSyncStream(e)
+
+ def lineReceived(self, line):
+ assert not self._properly_finished
+ if ']' == line:
+ self._properly_finished = True
+ elif self._line == 0:
+ assert line == '['
+ self._line += 1
+ elif self._line == 1:
+ self._line += 1
+ self.metadata = json.loads(line)
+ assert 'error' not in self.metadata
+ elif (self._line % 2) == 0:
+ self._line += 1
+ self.current_doc = json.loads(line)
+ assert 'error' not in self.current_doc
+ else:
+ self._line += 1
+ self._doc_reader(self.current_doc, line.strip() or None)
+
+ def finish(self):
+ if not self._properly_finished:
+ raise errors.BrokenSyncStream()
+ content = self._buffer.getvalue()[0:self._buffer.tell()]
+ self._buffer.close()
+ return content
+
+
+def build_body_reader(doc_reader):
+ """
+ Get the documents from a sync stream and call doc_reader on each
+ doc received.
+
+ @param doc_reader: Function to be called for processing an incoming doc.
+ Will be called with doc metadata (dict parsed from 1st line) and doc
+ content (string)
+ @type response: function
+
+ @return: A L{Deferred} which will fire with the sync metadata.
+ Cancelling it will close the connection to the server immediately.
+ """
+ def read(response):
+ def cancel(deferred):
+ """
+ Cancel a L{readBody} call, close the connection to the HTTP server
+ immediately, if it is still open.
+
+ @param deferred: The cancelled L{defer.Deferred}.
+ """
+ abort = getAbort()
+ if abort is not None:
+ abort()
+
+ def getAbort():
+ return getattr(protocol.transport, 'abortConnection', None)
+
+ d = defer.Deferred(cancel)
+ protocol = DocStreamReceiver(response, d, doc_reader)
+ response.deliverBody(protocol)
+ if protocol.transport is not None and getAbort() is None:
+ warnings.warn(
+ 'Using readBody with a transport that does not have an '
+ 'abortConnection method',
+ category=DeprecationWarning,
+ stacklevel=2)
+ return d
+ return read
+
+
+def read_doc(doc_info, content):
+ print doc_info, len(content)
+
+
+def finish(args):
+ print args
+ reactor.stop()
+
+
+def fetch(url, token, sync_id):
+ headers = {'Authorization': ['Token %s' % token]}
+ headers.update({'content-type': ['application/x-soledad-sync-get']})
+ body = """[
+{"ensure": false, "last_known_trans_id": "", "sync_id": "%s",
+"last_known_generation": 0},
+{"received": 0}
+]""" % sync_id
+ http = HTTPClient(pool=HTTPConnectionPool(reactor))
+ d = http.request(url, 'POST', body, headers, build_body_reader(read_doc))
+ d.addBoth(finish)
+
+
+if __name__ == "__main__":
+ assert len(sys.argv) == 4
+ reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3])
+ reactor.run()
diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py
index 6f2ffe9f..c958bfaa 100644
--- a/server/src/leap/soledad/server/sync.py
+++ b/server/src/leap/soledad/server/sync.py
@@ -254,7 +254,11 @@ class SyncResource(http_app.SyncResource):
gen=gen, trans_id=trans_id)
self.responder.stream_entry(entry)
content = doc.get_json()
- self.responder.stream_entry(content.read() if content else '')
+ if content:
+ self.responder.stream_entry(content.read())
+ content.close()
+ else:
+ self.responder.stream_entry('')
new_gen, number_of_changes = \
self.sync_exch.find_changes_to_return(received)
diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py
index a2935539..997dcdcd 100644
--- a/testing/tests/sync/test_sync_target.py
+++ b/testing/tests/sync/test_sync_target.py
@@ -30,6 +30,7 @@ from testscenarios import TestWithScenarios
from twisted.internet import defer
from leap.soledad.client import http_target as target
+from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver
from leap.soledad.client import crypto
from leap.soledad.client.sqlcipher import SQLCipherU1DBSync
from leap.soledad.client.sqlcipher import SQLCipherOptions
@@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app
from test_soledad.util import make_token_soledad_app
from test_soledad.util import make_soledad_document_for_test
from test_soledad.util import soledad_sync_target
+from twisted.trial import unittest
from test_soledad.util import SoledadWithCouchServerMixin
from test_soledad.util import ADDRESS
from test_soledad.util import SQLCIPHER_SCENARIOS
@@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS
# The following tests come from `u1db.tests.test_remote_sync_target`.
# -----------------------------------------------------------------------------
-class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin):
+class TestSoledadParseReceivedDocResponse(unittest.TestCase):
"""
Some tests had to be copied to this class so we can instantiate our own
target.
"""
- def setUp(self):
- SoledadWithCouchServerMixin.setUp(self)
- creds = {'token': {
- 'uuid': 'user-uuid',
- 'token': 'auth-token',
- }}
- self.target = target.SoledadHTTPSyncTarget(
- self.couch_url,
- uuid4().hex,
- creds,
- self._soledad._crypto,
- None)
-
- def tearDown(self):
- self.target.close()
- SoledadWithCouchServerMixin.tearDown(self)
+ def parse(self, stream):
+ parser = DocStreamReceiver(None, None, lambda x, y: 42)
+ parser.dataReceived(stream)
+ parser.finish()
def test_extra_comma(self):
- """
- Test adapted to use encrypted content.
- """
doc = SoledadDocument('i', rev='r')
- doc.content = {}
- _crypto = self._soledad._crypto
- key = _crypto.doc_passphrase(doc.doc_id)
- secret = _crypto.secret
+ doc.content = {'a': 'b'}
- enc_json = crypto.encrypt_docstr(
- doc.get_json(), doc.doc_id, doc.rev,
- key, secret)
+ encrypted_docstr = crypto.SoledadCrypto('').encrypt_doc(doc)
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n{},\r\n]")
+ self.parse("[\r\n{},\r\n]")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
('[\r\n{},\r\n{"id": "i", "rev": "r", ' +
- '"content": %s, "gen": 3, "trans_id": "T-sid"}' +
- ',\r\n]') % json.dumps(enc_json))
+ '"gen": 3, "trans_id": "T-sid"},\r\n' +
+ '%s,\r\n]') % encrypted_docstr)
def test_wrong_start(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("{}\r\n]")
-
- with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("\r\n{}\r\n]")
+ self.parse("{}\r\n]")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("")
+ self.parse("\r\n{}\r\n]")
def test_wrong_end(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n{}")
+ self.parse("[\r\n{}")
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n")
+ self.parse("[\r\n")
def test_missing_comma(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{}\r\n{"id": "i", "rev": "r", '
'"content": "c", "gen": 3}\r\n]')
def test_no_entries(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response("[\r\n]")
+ self.parse("[\r\n]")
def test_error_in_stream(self):
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{"new_generation": 0},'
'\r\n{"error": "unavailable"}\r\n')
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response(
+ self.parse(
'[\r\n{"error": "unavailable"}\r\n')
with self.assertRaises(l2db.errors.BrokenSyncStream):
- self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n')
+ self.parse('[\r\n{"error": "?"}\r\n')
#
# functions for TestRemoteSyncTargets