diff options
-rw-r--r-- | server/src/leap/soledad/server/auth.py | 50 | ||||
-rw-r--r-- | server/src/leap/soledad/server/session.py | 4 | ||||
-rw-r--r-- | testing/tests/server/test_auth.py | 92 |
3 files changed, 127 insertions, 19 deletions
diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py index 13245cfe..bcef2e7c 100644 --- a/server/src/leap/soledad/server/auth.py +++ b/server/src/leap/soledad/server/auth.py @@ -29,6 +29,7 @@ from twisted.cred.credentials import IUsernamePassword from twisted.cred.credentials import UsernamePassword from twisted.cred.portal import IRealm from twisted.cred.portal import Portal +from twisted.internet import defer from twisted.web.iweb import ICredentialFactory from twisted.web.resource import IResource @@ -57,31 +58,45 @@ class TokenChecker(object): TOKENS_TYPE_DEF = "Token" TOKENS_USER_ID_KEY = "user_id" - def __init__(self): - config = get_config() - self.couch_url = config['couch_url'] + def __init__(self, server=None): + if server is None: + config = get_config() + couch_url = config['couch_url'] + server = couch_server(couch_url) + self._server = server + self._dbs = {} def _tokens_dbname(self): dbname = self.TOKENS_DB_PREFIX + \ str(int(time.time() / self.TOKENS_DB_EXPIRE)) return dbname + def _get_db(self, dbname): + if dbname not in self._dbs: + self._dbs[dbname] = self._server[dbname] + return self._dbs[dbname] + + def _tokens_db(self): + # the tokens db rotates every 30 days, and the current db name is + # "tokens_NNN", where NNN is the number of seconds since epoch + # divide dby the rotate period in seconds. When rotating, old and + # new tokens db coexist during a certain window of time and valid + # tokens are replicated from the old db to the new one. See: + # https://leap.se/code/issues/6785 + dbname = self._tokens_dbname() + db = self._get_db(dbname) + return db + def requestAvatarId(self, credentials): uuid = credentials.username token = credentials.password - with couch_server(self.couch_url) as server: - # the tokens db rotates every 30 days, and the current db name is - # "tokens_NNN", where NNN is the number of seconds since epoch - # divide dby the rotate period in seconds. When rotating, old and - # new tokens db coexist during a certain window of time and valid - # tokens are replicated from the old db to the new one. See: - # https://leap.se/code/issues/6785 - dbname = self._tokens_dbname() - db = server[dbname] + # lookup key is a hash of the token to prevent timing attacks. + db = self._tokens_db() token = db.get(sha512(token).hexdigest()) if token is None: - return False + return defer.fail(error.UnauthorizedLogin()) + # we compare uuid hashes to avoid possible timing attacks that # might exploit python's builtin comparison operator behaviour, # which fails immediatelly when non-matching bytes are found. @@ -89,8 +104,9 @@ class TokenChecker(object): req_uuid_hash = sha512(uuid).digest() if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ or couch_uuid_hash != req_uuid_hash: - return False - return True + return defer.fail(error.UnauthorizedLogin()) + + return defer.succeed(uuid) @implementer(ICredentialFactory) @@ -103,7 +119,7 @@ class TokenCredentialFactory(object): def decode(self, response, request): try: - creds = response.decode('base64') + creds = binascii.a2b_base64(response + b'===') except binascii.Error: raise error.LoginFailed('Invalid credentials') @@ -114,5 +130,5 @@ class TokenCredentialFactory(object): raise error.LoginFailed('Invalid credentials') -portal = Portal(SoledadRealm(), [TokenChecker()]) +get_portal = lambda: Portal(SoledadRealm(), [TokenChecker()]) credentialFactory = TokenCredentialFactory() diff --git a/server/src/leap/soledad/server/session.py b/server/src/leap/soledad/server/session.py index 4ed2721c..f1626115 100644 --- a/server/src/leap/soledad/server/session.py +++ b/server/src/leap/soledad/server/session.py @@ -26,7 +26,7 @@ from twisted.web.guard import HTTPAuthSessionWrapper from twisted.web.resource import ErrorPage from twisted.web.resource import IResource -from leap.soledad.server.auth import portal +from leap.soledad.server.auth import get_portal from leap.soledad.server.auth import credentialFactory from leap.soledad.server.url_mapper import URLMapper @@ -50,7 +50,7 @@ class SoledadSession(HTTPAuthSessionWrapper): def __init__(self): self._mapper = URLMapper() - self._portal = portal + self._portal = get_portal() self._credentialFactory = credentialFactory def _matchPath(self, request): diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py new file mode 100644 index 00000000..3ff738f2 --- /dev/null +++ b/testing/tests/server/test_auth.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# test_auth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for auth pieces. +""" +import collections + +from twisted.cred.credentials import UsernamePassword +from twisted.cred.error import UnauthorizedLogin +from twisted.internet.defer import inlineCallbacks +from twisted.trial import unittest +from twisted.web.resource import IResource + +from leap.soledad.server.auth import SoledadRealm +from leap.soledad.server.auth import TokenChecker +from leap.soledad.server.auth import TokenCredentialFactory +from leap.soledad.server.resource import SoledadResource + + +class SoledadRealmTestCase(unittest.TestCase): + + def test_returned_resource(self): + realm = SoledadRealm() + iface, avatar, logout = realm.requestAvatar('any', None, IResource) + self.assertIsInstance(avatar, SoledadResource) + self.assertIsNone(logout()) + + +class DummyServer(object): + """ + I fake the `couchdb.client.Server` GET api and always return the token + given on my creation. + """ + + def __init__(self, token): + self._token = token + + def get(self, _): + return self._token + + +class TokenCheckerTestCase(unittest.TestCase): + + @inlineCallbacks + def test_good_creds(self): + # set up a dummy server which always return a *valid* token document + token = {'user_id': 'user', 'type': 'Token'} + server = collections.defaultdict(lambda: DummyServer(token)) + # setup the checker with the custom server + checker = TokenChecker(server=server) + # assert the checker *can* verify the creds + creds = UsernamePassword('user', 'pass') + avatarId = yield checker.requestAvatarId(creds) + self.assertEqual('user', avatarId) + + @inlineCallbacks + def test_bad_creds(self): + # set up a dummy server which always return an *invalid* token document + token = None + server = collections.defaultdict(lambda: DummyServer(token)) + # setup the checker with the custom server + checker = TokenChecker(server=server) + # assert the checker *cannot* verify the creds + creds = UsernamePassword('user', '') + with self.assertRaises(UnauthorizedLogin): + yield checker.requestAvatarId(creds) + + +from twisted.web.test import test_httpauth + + +class TokenCredentialFactoryTestcase( + test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin, + unittest.TestCase): + + def setUp(self): + test_httpauth.BasicAuthTestsMixin.setUp(self) + self.credentialFactory = TokenCredentialFactory() |