summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/src/leap/soledad/server/session.py22
-rw-r--r--testing/tests/server/test_auth.py6
-rw-r--r--testing/tests/server/test_session.py209
3 files changed, 229 insertions, 8 deletions
diff --git a/server/src/leap/soledad/server/session.py b/server/src/leap/soledad/server/session.py
index f1626115..a56e4e97 100644
--- a/server/src/leap/soledad/server/session.py
+++ b/server/src/leap/soledad/server/session.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# session.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2017 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -22,6 +22,7 @@ from zope.interface import implementer
from twisted.cred import error
from twisted.python import log
from twisted.web import util
+from twisted.web._auth import wrapper
from twisted.web.guard import HTTPAuthSessionWrapper
from twisted.web.resource import ErrorPage
from twisted.web.resource import IResource
@@ -32,9 +33,12 @@ from leap.soledad.server.url_mapper import URLMapper
@implementer(IResource)
-class UnauthorizedResource(object):
+class UnauthorizedResource(wrapper.UnauthorizedResource):
isLeaf = True
+ def __init__(self):
+ pass
+
def render(self, request):
request.setResponseCode(401)
if request.method == b'HEAD':
@@ -48,9 +52,12 @@ class UnauthorizedResource(object):
@implementer(IResource)
class SoledadSession(HTTPAuthSessionWrapper):
- def __init__(self):
+ def __init__(self, portal=None):
+ if portal is None:
+ portal = get_portal()
+
self._mapper = URLMapper()
- self._portal = get_portal()
+ self._portal = portal
self._credentialFactory = credentialFactory
def _matchPath(self, request):
@@ -65,18 +72,22 @@ class SoledadSession(HTTPAuthSessionWrapper):
return None
def _authorizedResource(self, request):
+ # check whether the path of the request exists in the app
match = self._matchPath(request)
if not match:
return UnauthorizedResource()
+ # get authorization header or fail
header = request.getHeader(b'authorization')
if not header:
return UnauthorizedResource()
+ # parse the authorization header
auth_data = self._parseHeader(header)
if not auth_data:
return UnauthorizedResource()
+ # decode the credentials from the parsed header
try:
credentials = self._credentialFactory.decode(auth_data, request)
except error.LoginFailed:
@@ -85,10 +96,13 @@ class SoledadSession(HTTPAuthSessionWrapper):
log.err(None, "Unexpected failure from credentials factory")
return ErrorPage(500, None, None)
+ # make sure the uuid given in path corresponds to the one given in
+ # the credentials
request_uuid = match.get('uuid')
if request_uuid and request_uuid != credentials.username:
return ErrorPage(500, None, None)
+ # if all checks pass, try to login with credentials
return util.DeferredResource(self._login(credentials))
diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py
index 3ff738f2..0e6baba3 100644
--- a/testing/tests/server/test_auth.py
+++ b/testing/tests/server/test_auth.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# test_auth.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2017 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@ from twisted.cred.error import UnauthorizedLogin
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
from twisted.web.resource import IResource
+from twisted.web.test import test_httpauth
from leap.soledad.server.auth import SoledadRealm
from leap.soledad.server.auth import TokenChecker
@@ -80,9 +81,6 @@ class TokenCheckerTestCase(unittest.TestCase):
yield checker.requestAvatarId(creds)
-from twisted.web.test import test_httpauth
-
-
class TokenCredentialFactoryTestcase(
test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
unittest.TestCase):
diff --git a/testing/tests/server/test_session.py b/testing/tests/server/test_session.py
new file mode 100644
index 00000000..7883ef4a
--- /dev/null
+++ b/testing/tests/server/test_session.py
@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+# test_session.py
+# Copyright (C) 2017 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for server session entrypoint.
+"""
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred import portal
+from twisted.web.test.test_httpauth import b64encode
+from twisted.web.test.test_httpauth import Realm
+from twisted.web.test.requesthelper import DummyRequest
+from twisted.web.resource import getChildForRequest
+
+from twisted.web.resource import Resource
+
+from twisted.trial import unittest
+
+from leap.soledad.server.session import SoledadSession
+
+from twisted.web.static import Data
+from twisted.web._auth.wrapper import UnauthorizedResource
+from twisted.cred.credentials import IUsernamePassword
+from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess
+
+
+class SoledadSessionTestCase(unittest.TestCase):
+ """
+ Tests adapted from for
+ L{twisted.web.test.test_httpauth.HTTPAuthSessionWrapper}.
+ """
+
+ def makeRequest(self, *args, **kwargs):
+ request = DummyRequest(*args, **kwargs)
+ request.path = '/'
+ return request
+
+ def setUp(self):
+ self.username = b'foo bar'
+ self.password = b'bar baz'
+ self.avatarContent = b"contents of the avatar resource itself"
+ self.childName = b"foo-child"
+ self.childContent = b"contents of the foo child of the avatar"
+ self.checker = InMemoryUsernamePasswordDatabaseDontUse()
+ self.checker.addUser(self.username, self.password)
+ self.avatar = Data(self.avatarContent, 'text/plain')
+ self.avatar.putChild(
+ self.childName, Data(self.childContent, 'text/plain'))
+ self.avatars = {self.username: self.avatar}
+ self.realm = Realm(self.avatars.get)
+ self.portal = portal.Portal(self.realm, [self.checker])
+ self.wrapper = SoledadSession(self.portal)
+
+ def _authorizedTokenLogin(self, request):
+ authorization = b64encode(
+ self.username + b':' + self.password)
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token ' + authorization)
+ return getChildForRequest(self.wrapper, request)
+
+ def test_getChildWithDefault(self):
+ request = self.makeRequest([self.childName])
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def _invalidAuthorizationTest(self, response):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', response)
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(result):
+ self.assertEqual(request.responseCode, 401)
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_getChildWithDefaultUnauthorizedUser(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(b'foo:bar'))
+
+ def test_getChildWithDefaultUnauthorizedPassword(self):
+ return self._invalidAuthorizationTest(
+ b'Basic ' + b64encode(self.username + b':bar'))
+
+ def test_getChildWithDefaultUnrecognizedScheme(self):
+ return self._invalidAuthorizationTest(b'Quux foo bar baz')
+
+ def test_getChildWithDefaultAuthorized(self):
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.childContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_renderAuthorized(self):
+ # Request it exactly, not any of its children.
+ request = self.makeRequest([])
+ child = self._authorizedTokenLogin(request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [self.avatarContent])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d
+
+ def test_decodeRaises(self):
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization',
+ b'Token decode should fail')
+ child = getChildForRequest(self.wrapper, request)
+ self.assertIsInstance(child, UnauthorizedResource)
+
+ def test_parseResponse(self):
+ basicAuthorization = b'Basic abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(basicAuthorization),
+ None)
+ tokenAuthorization = b'Token abcdef123456'
+ self.assertEqual(
+ self.wrapper._parseHeader(tokenAuthorization),
+ b'abcdef123456')
+
+ def test_unexpectedDecodeError(self):
+
+ class UnexpectedException(Exception):
+ pass
+
+ class BadFactory(object):
+ scheme = b'bad'
+
+ def getChallenge(self, client):
+ return {}
+
+ def decode(self, response, request):
+ raise UnexpectedException()
+
+ self.wrapper._credentialFactory = BadFactory()
+ request = self.makeRequest([self.childName])
+ request.requestHeaders.addRawHeader(b'authorization', b'Bad abc')
+ child = getChildForRequest(self.wrapper, request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+
+ def test_unexpectedLoginError(self):
+ class UnexpectedException(Exception):
+ pass
+
+ class BrokenChecker(object):
+ credentialInterfaces = (IUsernamePassword,)
+
+ def requestAvatarId(self, credentials):
+ raise UnexpectedException()
+
+ self.portal.registerChecker(BrokenChecker())
+ request = self.makeRequest([self.childName])
+ child = self._authorizedTokenLogin(request)
+ request.render(child)
+ self.assertEqual(request.responseCode, 500)
+
+ def test_anonymousAccess(self):
+ """
+ Anonymous requests are allowed if a L{Portal} has an anonymous checker
+ registered.
+ """
+ unprotectedContents = b"contents of the unprotected child resource"
+
+ self.avatars[ANONYMOUS] = Resource()
+ self.avatars[ANONYMOUS].putChild(
+ self.childName, Data(unprotectedContents, 'text/plain'))
+ self.portal.registerChecker(AllowAnonymousAccess())
+
+ request = self.makeRequest([self.childName])
+ child = getChildForRequest(self.wrapper, request)
+ d = request.notifyFinish()
+
+ def cbFinished(ignored):
+ self.assertEqual(request.written, [unprotectedContents])
+
+ d.addCallback(cbFinished)
+ request.render(child)
+ return d