diff options
| -rw-r--r-- | server/src/leap/soledad/server/session.py | 22 | ||||
| -rw-r--r-- | testing/tests/server/test_auth.py | 6 | ||||
| -rw-r--r-- | testing/tests/server/test_session.py | 209 | 
3 files changed, 229 insertions, 8 deletions
| diff --git a/server/src/leap/soledad/server/session.py b/server/src/leap/soledad/server/session.py index f1626115..a56e4e97 100644 --- a/server/src/leap/soledad/server/session.py +++ b/server/src/leap/soledad/server/session.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # session.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2017 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -22,6 +22,7 @@ from zope.interface import implementer  from twisted.cred import error  from twisted.python import log  from twisted.web import util +from twisted.web._auth import wrapper  from twisted.web.guard import HTTPAuthSessionWrapper  from twisted.web.resource import ErrorPage  from twisted.web.resource import IResource @@ -32,9 +33,12 @@ from leap.soledad.server.url_mapper import URLMapper  @implementer(IResource) -class UnauthorizedResource(object): +class UnauthorizedResource(wrapper.UnauthorizedResource):      isLeaf = True +    def __init__(self): +        pass +      def render(self, request):          request.setResponseCode(401)          if request.method == b'HEAD': @@ -48,9 +52,12 @@ class UnauthorizedResource(object):  @implementer(IResource)  class SoledadSession(HTTPAuthSessionWrapper): -    def __init__(self): +    def __init__(self, portal=None): +        if portal is None: +            portal = get_portal() +          self._mapper = URLMapper() -        self._portal = get_portal() +        self._portal = portal          self._credentialFactory = credentialFactory      def _matchPath(self, request): @@ -65,18 +72,22 @@ class SoledadSession(HTTPAuthSessionWrapper):          return None      def _authorizedResource(self, request): +        # check whether the path of the request exists in the app          match = self._matchPath(request)          if not match:              return UnauthorizedResource() +        # get authorization header or fail          header = request.getHeader(b'authorization')          if not header:              return UnauthorizedResource() +        # parse the authorization header          auth_data = self._parseHeader(header)          if not auth_data:              return UnauthorizedResource() +        # decode the credentials from the parsed header          try:              credentials = self._credentialFactory.decode(auth_data, request)          except error.LoginFailed: @@ -85,10 +96,13 @@ class SoledadSession(HTTPAuthSessionWrapper):              log.err(None, "Unexpected failure from credentials factory")              return ErrorPage(500, None, None) +        # make sure the uuid given in path corresponds to the one given in +        # the credentials          request_uuid = match.get('uuid')          if request_uuid and request_uuid != credentials.username:              return ErrorPage(500, None, None) +        # if all checks pass, try to login with credentials          return util.DeferredResource(self._login(credentials)) diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py index 3ff738f2..0e6baba3 100644 --- a/testing/tests/server/test_auth.py +++ b/testing/tests/server/test_auth.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # test_auth.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2017 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -24,6 +24,7 @@ from twisted.cred.error import UnauthorizedLogin  from twisted.internet.defer import inlineCallbacks  from twisted.trial import unittest  from twisted.web.resource import IResource +from twisted.web.test import test_httpauth  from leap.soledad.server.auth import SoledadRealm  from leap.soledad.server.auth import TokenChecker @@ -80,9 +81,6 @@ class TokenCheckerTestCase(unittest.TestCase):              yield checker.requestAvatarId(creds) -from twisted.web.test import test_httpauth - -  class TokenCredentialFactoryTestcase(          test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,          unittest.TestCase): diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py new file mode 100644 index 00000000..7883ef4a --- /dev/null +++ b/testing/tests/server/test_session.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +# test_session.py +# Copyright (C) 2017 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for server session entrypoint. +""" +from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse +from twisted.cred import portal +from twisted.web.test.test_httpauth import b64encode +from twisted.web.test.test_httpauth import Realm +from twisted.web.test.requesthelper import DummyRequest +from twisted.web.resource import getChildForRequest + +from twisted.web.resource import Resource + +from twisted.trial import unittest + +from leap.soledad.server.session import SoledadSession + +from twisted.web.static import Data +from twisted.web._auth.wrapper import UnauthorizedResource +from twisted.cred.credentials import IUsernamePassword +from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess + + +class SoledadSessionTestCase(unittest.TestCase): +    """ +    Tests adapted from for +    L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. +    """ + +    def makeRequest(self, *args, **kwargs): +        request = DummyRequest(*args, **kwargs) +        request.path = '/' +        return request + +    def setUp(self): +        self.username = b'foo bar' +        self.password = b'bar baz' +        self.avatarContent = b"contents of the avatar resource itself" +        self.childName = b"foo-child" +        self.childContent = b"contents of the foo child of the avatar" +        self.checker = InMemoryUsernamePasswordDatabaseDontUse() +        self.checker.addUser(self.username, self.password) +        self.avatar = Data(self.avatarContent, 'text/plain') +        self.avatar.putChild( +            self.childName, Data(self.childContent, 'text/plain')) +        self.avatars = {self.username: self.avatar} +        self.realm = Realm(self.avatars.get) +        self.portal = portal.Portal(self.realm, [self.checker]) +        self.wrapper = SoledadSession(self.portal) + +    def _authorizedTokenLogin(self, request): +        authorization = b64encode( +            self.username + b':' + self.password) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token ' + authorization) +        return getChildForRequest(self.wrapper, request) + +    def test_getChildWithDefault(self): +        request = self.makeRequest([self.childName]) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def _invalidAuthorizationTest(self, response): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', response) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(result): +            self.assertEqual(request.responseCode, 401) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_getChildWithDefaultUnauthorizedUser(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(b'foo:bar')) + +    def test_getChildWithDefaultUnauthorizedPassword(self): +        return self._invalidAuthorizationTest( +            b'Basic ' + b64encode(self.username + b':bar')) + +    def test_getChildWithDefaultUnrecognizedScheme(self): +        return self._invalidAuthorizationTest(b'Quux foo bar baz') + +    def test_getChildWithDefaultAuthorized(self): +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.childContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_renderAuthorized(self): +        # Request it exactly, not any of its children. +        request = self.makeRequest([]) +        child = self._authorizedTokenLogin(request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [self.avatarContent]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d + +    def test_decodeRaises(self): +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', +                                            b'Token decode should fail') +        child = getChildForRequest(self.wrapper, request) +        self.assertIsInstance(child, UnauthorizedResource) + +    def test_parseResponse(self): +        basicAuthorization = b'Basic abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(basicAuthorization), +            None) +        tokenAuthorization = b'Token abcdef123456' +        self.assertEqual( +            self.wrapper._parseHeader(tokenAuthorization), +            b'abcdef123456') + +    def test_unexpectedDecodeError(self): + +        class UnexpectedException(Exception): +            pass + +        class BadFactory(object): +            scheme = b'bad' + +            def getChallenge(self, client): +                return {} + +            def decode(self, response, request): +                raise UnexpectedException() + +        self.wrapper._credentialFactory = BadFactory() +        request = self.makeRequest([self.childName]) +        request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') +        child = getChildForRequest(self.wrapper, request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) + +    def test_unexpectedLoginError(self): +        class UnexpectedException(Exception): +            pass + +        class BrokenChecker(object): +            credentialInterfaces = (IUsernamePassword,) + +            def requestAvatarId(self, credentials): +                raise UnexpectedException() + +        self.portal.registerChecker(BrokenChecker()) +        request = self.makeRequest([self.childName]) +        child = self._authorizedTokenLogin(request) +        request.render(child) +        self.assertEqual(request.responseCode, 500) + +    def test_anonymousAccess(self): +        """ +        Anonymous requests are allowed if a L{Portal} has an anonymous checker +        registered. +        """ +        unprotectedContents = b"contents of the unprotected child resource" + +        self.avatars[ANONYMOUS] = Resource() +        self.avatars[ANONYMOUS].putChild( +            self.childName, Data(unprotectedContents, 'text/plain')) +        self.portal.registerChecker(AllowAnonymousAccess()) + +        request = self.makeRequest([self.childName]) +        child = getChildForRequest(self.wrapper, request) +        d = request.notifyFinish() + +        def cbFinished(ignored): +            self.assertEqual(request.written, [unprotectedContents]) + +        d.addCallback(cbFinished) +        request.render(child) +        return d | 
