diff options
| -rw-r--r-- | client/src/leap/soledad/client/http_target/api.py | 4 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 91 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 206 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 6 | ||||
| -rw-r--r-- | testing/tests/sync/test_sync_target.py | 65 | 
5 files changed, 273 insertions, 99 deletions
diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index c9da939c..4e068523 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -72,11 +72,11 @@ class SyncTargetAPI(SyncTarget):          return self._sync_enc_pool is not None      def _http_request(self, url, method='GET', body=None, headers=None, -                      content_type=None): +                      content_type=None, body_reader=readBody):          headers = headers or self._base_header          if content_type:              headers.update({'content-type': [content_type]}) -        d = self._http.request(url, method, body, headers, readBody) +        d = self._http.request(url, method, body, headers, body_reader)          d.addErrback(_unauth_to_invalid_token_error)          return d diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 1f1bc480..063082e5 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -14,8 +14,6 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. -import json -  from twisted.internet import defer  from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS @@ -25,7 +23,9 @@ from leap.soledad.client.http_target.support import RequestBody  from leap.soledad.common.log import getLogger  from leap.soledad.common.document import SoledadDocument  from leap.soledad.common.l2db import errors -from leap.soledad.common.l2db.remote import utils +from datetime import datetime + +from . import fetch_protocol  logger = getLogger(__name__) @@ -58,12 +58,12 @@ class HTTPDocFetcher(object):          # to know the total number of documents to be received, and this          # information comes as metadata to each request. -        docs = yield self._fetch_all( -            last_known_generation, last_known_trans_id, -            sync_id, 0)          self._received_docs = 0 +        metadata = yield self._fetch_all( +            last_known_generation, last_known_trans_id, +            sync_id, self._received_docs)          number_of_changes, ngen, ntrans =\ -            self._insert_received_docs(docs, 1, 1) +            self._parse_metadata(metadata)          if ngen:              new_generation = ngen @@ -81,14 +81,17 @@ class HTTPDocFetcher(object):              ensure=self._ensure_callback is not None)          # inform server of how many documents have already been received          body.insert_info(received=received) -        # send headers +        # build a stream reader with doc parser callback +        body_reader = fetch_protocol.build_body_reader(self._doc_parser) +        # start download stream          return self._http_request(              self._url,              method='POST',              body=str(body), -            content_type='application/x-soledad-sync-get') +            content_type='application/x-soledad-sync-get', +            body_reader=body_reader) -    def _insert_received_docs(self, response, idx, total): +    def _doc_parser(self, doc_info, content):          """          Insert a received document into the local replica. @@ -99,26 +102,20 @@ class HTTPDocFetcher(object):          :param total: The total number of operations.          :type total: int          """ -        new_generation, new_transaction_id, number_of_changes, entries =\ -            self._parse_received_doc_response(response) - -        for doc_id, rev, content, gen, trans_id in entries: -            if doc_id is not None: -                # decrypt incoming document and insert into local database -                # --------------------------------------------------------- -                # symmetric decryption of document's contents -                # --------------------------------------------------------- -                # If arriving content was symmetrically encrypted, we decrypt -                doc = SoledadDocument(doc_id, rev, content) -                if is_symmetrically_encrypted(doc): -                    doc.set_json(self._crypto.decrypt_doc(doc)) -                self._insert_doc_cb(doc, gen, trans_id) -            self._received_docs += 1 -            user_data = {'uuid': self.uuid, 'userid': self.userid} -            _emit_receive_status(user_data, self._received_docs, total) -        return number_of_changes, new_generation, new_transaction_id - -    def _parse_received_doc_response(self, response): +        # decrypt incoming document and insert into local database +        # --------------------------------------------------------- +        # symmetric decryption of document's contents +        # --------------------------------------------------------- +        # If arriving content was symmetrically encrypted, we decrypt +        doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) +        if is_symmetrically_encrypted(doc): +            doc.set_json(self._crypto.decrypt_doc(doc)) +        self._insert_doc_cb(doc, doc_info['gen'], doc_info['trans_id']) +        self._received_docs += 1 +        user_data = {'uuid': self.uuid, 'userid': self.userid} +        _emit_receive_status(user_data, self._received_docs, total=1000000) + +    def _parse_metadata(self, metadata):          """          Parse the response from the server containing the received document. @@ -130,18 +127,18 @@ class HTTPDocFetcher(object):          :rtype: tuple          """          # decode incoming stream -        parts = response.splitlines() -        if not parts or parts[0] != '[' or parts[-1] != ']': -            raise errors.BrokenSyncStream -        data = parts[1:-1] +        # parts = response.splitlines() +        # if not parts or parts[0] != '[' or parts[-1] != ']': +        #    raise errors.BrokenSyncStream +        # data = parts[1:-1]          # decode metadata +        # try: +        #    line, comma = utils.check_and_strip_comma(data[0]) +        #    metadata = None +        # except (IndexError): +        #    raise errors.BrokenSyncStream          try: -            line, comma = utils.check_and_strip_comma(data[0]) -            metadata = None -        except (IndexError): -            raise errors.BrokenSyncStream -        try: -            metadata = json.loads(line) +            # metadata = json.loads(line)              new_generation = metadata['new_generation']              new_transaction_id = metadata['new_transaction_id']              number_of_changes = metadata['number_of_changes'] @@ -150,19 +147,7 @@ class HTTPDocFetcher(object):          # make sure we have replica_uid from fresh new dbs          if self._ensure_callback and 'replica_uid' in metadata:              self._ensure_callback(metadata['replica_uid']) -        # parse incoming document info -        entries = [] -        for index in xrange(1, len(data[1:]), 2): -            try: -                line, comma = utils.check_and_strip_comma(data[index]) -                content, _ = utils.check_and_strip_comma(data[index + 1]) -                entry = json.loads(line) -                entries.append((entry['id'], entry['rev'], content or None, -                                entry['gen'], entry['trans_id'])) -            except (IndexError, KeyError): -                raise errors.BrokenSyncStream -        return new_generation, new_transaction_id, number_of_changes, \ -            entries +        return number_of_changes, new_generation, new_transaction_id  def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py new file mode 100644 index 00000000..6ecba2b0 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python + +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Example using stdio, Deferreds, LineReceiver and twisted.web.client. + +Note that the WebCheckerCommandProtocol protocol could easily be used in e.g. +a telnet server instead; see the comments for details. + +Based on an example by Abe Fettig. +""" +import sys +import json +import warnings +from cStringIO import StringIO +from twisted.internet import reactor +from twisted.internet import defer +from twisted.internet import protocol +from twisted.web.client import HTTPConnectionPool +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss +from twisted.web.client import PartialDownloadError +from leap.soledad.common.l2db import errors +from leap.soledad.common.l2db.remote import utils +from leap.soledad.common.l2db.remote import http_errors +from leap.common.http import HTTPClient + + +class DocStreamReceiver(protocol.Protocol): + +    def __init__(self, response, deferred, doc_reader): +        self.deferred = deferred +        self.status = response.code if response else None +        self.message = response.phrase if response else None +        self.headers = response.headers if response else {} +        self.delimiter = '\r\n' +        self._doc_reader = doc_reader +        self.reset() + +    def reset(self): +        self._line = 0 +        self._buffer = StringIO() +        self._properly_finished = False + +    # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks +    def _error(self, respdic): +        descr = respdic.get("error") +        exc_cls = errors.wire_description_to_exc.get(descr) +        if exc_cls is not None: +            message = respdic.get("message") +            self.deferred.errback(exc_cls(message)) +    # ---8<--- end of snippet from u1db.remote.http_client + +    def connectionLost(self, reason): +        """ +        Deliver the accumulated response bytes to the waiting L{Deferred}, if +        the response body has been completely received without error. +        """ +        if reason.check(ResponseDone): + +            try: +                body = self.finish() +            except errors.BrokenSyncStream, e: +                return self.deferred.errback(e) + +            # ---8<--- snippet from u1db.remote.http_client +            if self.status in (200, 201): +                self.deferred.callback(self.metadata) +            elif self.status in http_errors.ERROR_STATUSES: +                try: +                    respdic = json.loads(body) +                except ValueError: +                    self.deferred.errback( +                        errors.HTTPError(self.status, body, self.headers)) +                else: +                    self._error(respdic) +            # special cases +            elif self.status == 503: +                self.deferred.errback(errors.Unavailable(body, self.headers)) +            else: +                self.deferred.errback( +                    errors.HTTPError(self.status, body, self.headers)) +            # ---8<--- end of snippet from u1db.remote.http_client + +        elif reason.check(PotentialDataLoss): +            self.deferred.errback( +                PartialDownloadError(self.status, self.message, +                                     b''.join(body))) +        else: +            self.deferred.errback(reason) + +    def consumeBufferLines(self): +        content = self._buffer.getvalue()[0:self._buffer.tell()] +        self._buffer.seek(0) +        lines = content.split(self.delimiter) +        self._buffer.write(lines.pop(-1)) +        return lines + +    def dataReceived(self, data): +        self._buffer.write(data) +        if '\n' not in data: +            return +        lines = self.consumeBufferLines() +        while lines: +            line, _ = utils.check_and_strip_comma(lines.pop(0)) +            try: +                self.lineReceived(line) +            except AssertionError, e: +                raise errors.BrokenSyncStream(e) + +    def lineReceived(self, line): +        assert not self._properly_finished +        if ']' == line: +            self._properly_finished = True +        elif self._line == 0: +            assert line == '[' +            self._line += 1 +        elif self._line == 1: +            self._line += 1 +            self.metadata = json.loads(line) +            assert 'error' not in self.metadata +        elif (self._line % 2) == 0: +            self._line += 1 +            self.current_doc = json.loads(line) +            assert 'error' not in self.current_doc +        else: +            self._line += 1 +            self._doc_reader(self.current_doc, line.strip() or None) + +    def finish(self): +        if not self._properly_finished: +            raise errors.BrokenSyncStream() +        content = self._buffer.getvalue()[0:self._buffer.tell()] +        self._buffer.close() +        return content + + +def build_body_reader(doc_reader): +    """ +    Get the documents from a sync stream and call doc_reader on each +    doc received. + +    @param doc_reader: Function to be called for processing an incoming doc. +        Will be called with doc metadata (dict parsed from 1st line) and doc +        content (string) +    @type response: function + +    @return: A L{Deferred} which will fire with the sync metadata. +        Cancelling it will close the connection to the server immediately. +    """ +    def read(response): +        def cancel(deferred): +            """ +            Cancel a L{readBody} call, close the connection to the HTTP server +            immediately, if it is still open. + +            @param deferred: The cancelled L{defer.Deferred}. +            """ +            abort = getAbort() +            if abort is not None: +                abort() + +        def getAbort(): +            return getattr(protocol.transport, 'abortConnection', None) + +        d = defer.Deferred(cancel) +        protocol = DocStreamReceiver(response, d, doc_reader) +        response.deliverBody(protocol) +        if protocol.transport is not None and getAbort() is None: +            warnings.warn( +                'Using readBody with a transport that does not have an ' +                'abortConnection method', +                category=DeprecationWarning, +                stacklevel=2) +        return d +    return read + + +def read_doc(doc_info, content): +    print doc_info, len(content) + + +def finish(args): +    print args +    reactor.stop() + + +def fetch(url, token, sync_id): +    headers = {'Authorization': ['Token %s' % token]} +    headers.update({'content-type': ['application/x-soledad-sync-get']}) +    body = """[ +{"ensure": false, "last_known_trans_id": "", "sync_id": "%s", +"last_known_generation": 0}, +{"received": 0} +]""" % sync_id +    http = HTTPClient(pool=HTTPConnectionPool(reactor)) +    d = http.request(url, 'POST', body, headers, build_body_reader(read_doc)) +    d.addBoth(finish) + + +if __name__ == "__main__": +    assert len(sys.argv) == 4 +    reactor.callWhenRunning(fetch, sys.argv[1], sys.argv[2], sys.argv[3]) +    reactor.run() diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 6f2ffe9f..c958bfaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -254,7 +254,11 @@ class SyncResource(http_app.SyncResource):                           gen=gen, trans_id=trans_id)              self.responder.stream_entry(entry)              content = doc.get_json() -            self.responder.stream_entry(content.read() if content else '') +            if content: +                self.responder.stream_entry(content.read()) +                content.close() +            else: +                self.responder.stream_entry('')          new_gen, number_of_changes = \              self.sync_exch.find_changes_to_return(received) diff --git a/testing/tests/sync/test_sync_target.py b/testing/tests/sync/test_sync_target.py index a2935539..997dcdcd 100644 --- a/testing/tests/sync/test_sync_target.py +++ b/testing/tests/sync/test_sync_target.py @@ -30,6 +30,7 @@ from testscenarios import TestWithScenarios  from twisted.internet import defer  from leap.soledad.client import http_target as target +from leap.soledad.client.http_target.fetch_protocol import DocStreamReceiver  from leap.soledad.client import crypto  from leap.soledad.client.sqlcipher import SQLCipherU1DBSync  from leap.soledad.client.sqlcipher import SQLCipherOptions @@ -44,6 +45,7 @@ from test_soledad.util import make_soledad_app  from test_soledad.util import make_token_soledad_app  from test_soledad.util import make_soledad_document_for_test  from test_soledad.util import soledad_sync_target +from twisted.trial import unittest  from test_soledad.util import SoledadWithCouchServerMixin  from test_soledad.util import ADDRESS  from test_soledad.util import SQLCIPHER_SCENARIOS @@ -53,92 +55,69 @@ from test_soledad.util import SQLCIPHER_SCENARIOS  # The following tests come from `u1db.tests.test_remote_sync_target`.  # ----------------------------------------------------------------------------- -class TestSoledadParseReceivedDocResponse(SoledadWithCouchServerMixin): +class TestSoledadParseReceivedDocResponse(unittest.TestCase):      """      Some tests had to be copied to this class so we can instantiate our own      target.      """ -    def setUp(self): -        SoledadWithCouchServerMixin.setUp(self) -        creds = {'token': { -            'uuid': 'user-uuid', -            'token': 'auth-token', -        }} -        self.target = target.SoledadHTTPSyncTarget( -            self.couch_url, -            uuid4().hex, -            creds, -            self._soledad._crypto, -            None) - -    def tearDown(self): -        self.target.close() -        SoledadWithCouchServerMixin.tearDown(self) +    def parse(self, stream): +        parser = DocStreamReceiver(None, None, lambda x, y: 42) +        parser.dataReceived(stream) +        parser.finish()      def test_extra_comma(self): -        """ -        Test adapted to use encrypted content. -        """          doc = SoledadDocument('i', rev='r') -        doc.content = {} -        _crypto = self._soledad._crypto -        key = _crypto.doc_passphrase(doc.doc_id) -        secret = _crypto.secret +        doc.content = {'a': 'b'} -        enc_json = crypto.encrypt_docstr( -            doc.get_json(), doc.doc_id, doc.rev, -            key, secret) +        encrypted_docstr = crypto.SoledadCrypto('').encrypt_doc(doc)          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n{},\r\n]") +            self.parse("[\r\n{},\r\n]")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  ('[\r\n{},\r\n{"id": "i", "rev": "r", ' + -                 '"content": %s, "gen": 3, "trans_id": "T-sid"}' + -                 ',\r\n]') % json.dumps(enc_json)) +                 '"gen": 3, "trans_id": "T-sid"},\r\n' + +                 '%s,\r\n]') % encrypted_docstr)      def test_wrong_start(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("{}\r\n]") - -        with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("\r\n{}\r\n]") +            self.parse("{}\r\n]")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("") +            self.parse("\r\n{}\r\n]")      def test_wrong_end(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n{}") +            self.parse("[\r\n{}")          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n") +            self.parse("[\r\n")      def test_missing_comma(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{}\r\n{"id": "i", "rev": "r", '                  '"content": "c", "gen": 3}\r\n]')      def test_no_entries(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response("[\r\n]") +            self.parse("[\r\n]")      def test_error_in_stream(self):          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{"new_generation": 0},'                  '\r\n{"error": "unavailable"}\r\n')          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response( +            self.parse(                  '[\r\n{"error": "unavailable"}\r\n')          with self.assertRaises(l2db.errors.BrokenSyncStream): -            self.target._parse_received_doc_response('[\r\n{"error": "?"}\r\n') +            self.parse('[\r\n{"error": "?"}\r\n')  #  # functions for TestRemoteSyncTargets  | 
