# -*- coding: utf-8 -*- # test_session.py # Copyright (C) 2017 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Tests for server session entrypoint. """ from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse from twisted.cred import portal from twisted.web.test.test_httpauth import b64encode from twisted.web.test.test_httpauth import Realm from twisted.web.test.requesthelper import DummyRequest from twisted.web.resource import getChildForRequest from twisted.web.resource import Resource from twisted.trial import unittest from leap.soledad.server.session import SoledadSession from twisted.web.static import Data from twisted.web._auth.wrapper import UnauthorizedResource from twisted.cred.credentials import IUsernamePassword from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess class SoledadSessionTestCase(unittest.TestCase): """ Tests adapted from for L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}. """ def makeRequest(self, *args, **kwargs): request = DummyRequest(*args, **kwargs) request.path = '/' return request def setUp(self): self.username = b'foo bar' self.password = b'bar baz' self.avatarContent = b"contents of the avatar resource itself" self.childName = b"foo-child" self.childContent = b"contents of the foo child of the avatar" self.checker = InMemoryUsernamePasswordDatabaseDontUse() self.checker.addUser(self.username, self.password) self.avatar = Data(self.avatarContent, 'text/plain') self.avatar.putChild( self.childName, Data(self.childContent, 'text/plain')) self.avatars = {self.username: self.avatar} self.realm = Realm(self.avatars.get) self.portal = portal.Portal(self.realm, [self.checker]) self.wrapper = SoledadSession(self.portal) def _authorizedTokenLogin(self, request): authorization = b64encode( self.username + b':' + self.password) request.requestHeaders.addRawHeader(b'authorization', b'Token ' + authorization) return getChildForRequest(self.wrapper, request) def test_getChildWithDefault(self): request = self.makeRequest([self.childName]) child = getChildForRequest(self.wrapper, request) d = request.notifyFinish() def cbFinished(result): self.assertEqual(request.responseCode, 401) d.addCallback(cbFinished) request.render(child) return d def _invalidAuthorizationTest(self, response): request = self.makeRequest([self.childName]) request.requestHeaders.addRawHeader(b'authorization', response) child = getChildForRequest(self.wrapper, request) d = request.notifyFinish() def cbFinished(result): self.assertEqual(request.responseCode, 401) d.addCallback(cbFinished) request.render(child) return d def test_getChildWithDefaultUnauthorizedUser(self): return self._invalidAuthorizationTest( b'Basic ' + b64encode(b'foo:bar')) def test_getChildWithDefaultUnauthorizedPassword(self): return self._invalidAuthorizationTest( b'Basic ' + b64encode(self.username + b':bar')) def test_getChildWithDefaultUnrecognizedScheme(self): return self._invalidAuthorizationTest(b'Quux foo bar baz') def test_getChildWithDefaultAuthorized(self): request = self.makeRequest([self.childName]) child = self._authorizedTokenLogin(request) d = request.notifyFinish() def cbFinished(ignored): self.assertEqual(request.written, [self.childContent]) d.addCallback(cbFinished) request.render(child) return d def test_renderAuthorized(self): # Request it exactly, not any of its children. request = self.makeRequest([]) child = self._authorizedTokenLogin(request) d = request.notifyFinish() def cbFinished(ignored): self.assertEqual(request.written, [self.avatarContent]) d.addCallback(cbFinished) request.render(child) return d def test_decodeRaises(self): request = self.makeRequest([self.childName]) request.requestHeaders.addRawHeader(b'authorization', b'Token decode should fail') child = getChildForRequest(self.wrapper, request) self.assertIsInstance(child, UnauthorizedResource) def test_parseResponse(self): basicAuthorization = b'Basic abcdef123456' self.assertEqual( self.wrapper._parseHeader(basicAuthorization), None) tokenAuthorization = b'Token abcdef123456' self.assertEqual( self.wrapper._parseHeader(tokenAuthorization), b'abcdef123456') def test_unexpectedDecodeError(self): class UnexpectedException(Exception): pass class BadFactory(object): scheme = b'bad' def getChallenge(self, client): return {} def decode(self, response, request): raise UnexpectedException() self.wrapper._credentialFactory = BadFactory() request = self.makeRequest([self.childName]) request.requestHeaders.addRawHeader(b'authorization', b'Bad abc') child = getChildForRequest(self.wrapper, request) request.render(child) self.assertEqual(request.responseCode, 500) def test_unexpectedLoginError(self): class UnexpectedException(Exception): pass class BrokenChecker(object): credentialInterfaces = (IUsernamePassword,) def requestAvatarId(self, credentials): raise UnexpectedException() self.portal.registerChecker(BrokenChecker()) request = self.makeRequest([self.childName]) child = self._authorizedTokenLogin(request) request.render(child) self.assertEqual(request.responseCode, 500) def test_anonymousAccess(self): """ Anonymous requests are allowed if a L{Portal} has an anonymous checker registered. """ unprotectedContents = b"contents of the unprotected child resource" self.avatars[ANONYMOUS] = Resource() self.avatars[ANONYMOUS].putChild( self.childName, Data(unprotectedContents, 'text/plain')) self.portal.registerChecker(AllowAnonymousAccess()) request = self.makeRequest([self.childName]) child = getChildForRequest(self.wrapper, request) d = request.notifyFinish() def cbFinished(ignored): self.assertEqual(request.written, [unprotectedContents]) d.addCallback(cbFinished) request.render(child) return d