summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2017-01-31 23:51:42 -0200
committerKali Kaneko <kali@leap.se>2017-02-09 17:41:42 +0100
commitb3e0af399b81b65d6665865d1e1b18aa589f5824 (patch)
treeeea7d3b825d80e3833671e7777b2273234679be7
parent4fce575de20effc9c4d934028f8ccdfbd97932e1 (diff)
[test] add tests for server auth
-rw-r--r--server/src/leap/soledad/server/auth.py50
-rw-r--r--server/src/leap/soledad/server/session.py4
-rw-r--r--testing/tests/server/test_auth.py92
3 files changed, 127 insertions, 19 deletions
diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py
index 13245cfe..bcef2e7c 100644
--- a/server/src/leap/soledad/server/auth.py
+++ b/server/src/leap/soledad/server/auth.py
@@ -29,6 +29,7 @@ from twisted.cred.credentials import IUsernamePassword
from twisted.cred.credentials import UsernamePassword
from twisted.cred.portal import IRealm
from twisted.cred.portal import Portal
+from twisted.internet import defer
from twisted.web.iweb import ICredentialFactory
from twisted.web.resource import IResource
@@ -57,31 +58,45 @@ class TokenChecker(object):
TOKENS_TYPE_DEF = "Token"
TOKENS_USER_ID_KEY = "user_id"
- def __init__(self):
- config = get_config()
- self.couch_url = config['couch_url']
+ def __init__(self, server=None):
+ if server is None:
+ config = get_config()
+ couch_url = config['couch_url']
+ server = couch_server(couch_url)
+ self._server = server
+ self._dbs = {}
def _tokens_dbname(self):
dbname = self.TOKENS_DB_PREFIX + \
str(int(time.time() / self.TOKENS_DB_EXPIRE))
return dbname
+ def _get_db(self, dbname):
+ if dbname not in self._dbs:
+ self._dbs[dbname] = self._server[dbname]
+ return self._dbs[dbname]
+
+ def _tokens_db(self):
+ # the tokens db rotates every 30 days, and the current db name is
+ # "tokens_NNN", where NNN is the number of seconds since epoch
+ # divide dby the rotate period in seconds. When rotating, old and
+ # new tokens db coexist during a certain window of time and valid
+ # tokens are replicated from the old db to the new one. See:
+ # https://leap.se/code/issues/6785
+ dbname = self._tokens_dbname()
+ db = self._get_db(dbname)
+ return db
+
def requestAvatarId(self, credentials):
uuid = credentials.username
token = credentials.password
- with couch_server(self.couch_url) as server:
- # the tokens db rotates every 30 days, and the current db name is
- # "tokens_NNN", where NNN is the number of seconds since epoch
- # divide dby the rotate period in seconds. When rotating, old and
- # new tokens db coexist during a certain window of time and valid
- # tokens are replicated from the old db to the new one. See:
- # https://leap.se/code/issues/6785
- dbname = self._tokens_dbname()
- db = server[dbname]
+
# lookup key is a hash of the token to prevent timing attacks.
+ db = self._tokens_db()
token = db.get(sha512(token).hexdigest())
if token is None:
- return False
+ return defer.fail(error.UnauthorizedLogin())
+
# we compare uuid hashes to avoid possible timing attacks that
# might exploit python's builtin comparison operator behaviour,
# which fails immediatelly when non-matching bytes are found.
@@ -89,8 +104,9 @@ class TokenChecker(object):
req_uuid_hash = sha512(uuid).digest()
if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \
or couch_uuid_hash != req_uuid_hash:
- return False
- return True
+ return defer.fail(error.UnauthorizedLogin())
+
+ return defer.succeed(uuid)
@implementer(ICredentialFactory)
@@ -103,7 +119,7 @@ class TokenCredentialFactory(object):
def decode(self, response, request):
try:
- creds = response.decode('base64')
+ creds = binascii.a2b_base64(response + b'===')
except binascii.Error:
raise error.LoginFailed('Invalid credentials')
@@ -114,5 +130,5 @@ class TokenCredentialFactory(object):
raise error.LoginFailed('Invalid credentials')
-portal = Portal(SoledadRealm(), [TokenChecker()])
+get_portal = lambda: Portal(SoledadRealm(), [TokenChecker()])
credentialFactory = TokenCredentialFactory()
diff --git a/server/src/leap/soledad/server/session.py b/server/src/leap/soledad/server/session.py
index 4ed2721c..f1626115 100644
--- a/server/src/leap/soledad/server/session.py
+++ b/server/src/leap/soledad/server/session.py
@@ -26,7 +26,7 @@ from twisted.web.guard import HTTPAuthSessionWrapper
from twisted.web.resource import ErrorPage
from twisted.web.resource import IResource
-from leap.soledad.server.auth import portal
+from leap.soledad.server.auth import get_portal
from leap.soledad.server.auth import credentialFactory
from leap.soledad.server.url_mapper import URLMapper
@@ -50,7 +50,7 @@ class SoledadSession(HTTPAuthSessionWrapper):
def __init__(self):
self._mapper = URLMapper()
- self._portal = portal
+ self._portal = get_portal()
self._credentialFactory = credentialFactory
def _matchPath(self, request):
diff --git a/testing/tests/server/test_auth.py b/testing/tests/server/test_auth.py
new file mode 100644
index 00000000..3ff738f2
--- /dev/null
+++ b/testing/tests/server/test_auth.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# test_auth.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for auth pieces.
+"""
+import collections
+
+from twisted.cred.credentials import UsernamePassword
+from twisted.cred.error import UnauthorizedLogin
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+from twisted.web.resource import IResource
+
+from leap.soledad.server.auth import SoledadRealm
+from leap.soledad.server.auth import TokenChecker
+from leap.soledad.server.auth import TokenCredentialFactory
+from leap.soledad.server.resource import SoledadResource
+
+
+class SoledadRealmTestCase(unittest.TestCase):
+
+ def test_returned_resource(self):
+ realm = SoledadRealm()
+ iface, avatar, logout = realm.requestAvatar('any', None, IResource)
+ self.assertIsInstance(avatar, SoledadResource)
+ self.assertIsNone(logout())
+
+
+class DummyServer(object):
+ """
+ I fake the `couchdb.client.Server` GET api and always return the token
+ given on my creation.
+ """
+
+ def __init__(self, token):
+ self._token = token
+
+ def get(self, _):
+ return self._token
+
+
+class TokenCheckerTestCase(unittest.TestCase):
+
+ @inlineCallbacks
+ def test_good_creds(self):
+ # set up a dummy server which always return a *valid* token document
+ token = {'user_id': 'user', 'type': 'Token'}
+ server = collections.defaultdict(lambda: DummyServer(token))
+ # setup the checker with the custom server
+ checker = TokenChecker(server=server)
+ # assert the checker *can* verify the creds
+ creds = UsernamePassword('user', 'pass')
+ avatarId = yield checker.requestAvatarId(creds)
+ self.assertEqual('user', avatarId)
+
+ @inlineCallbacks
+ def test_bad_creds(self):
+ # set up a dummy server which always return an *invalid* token document
+ token = None
+ server = collections.defaultdict(lambda: DummyServer(token))
+ # setup the checker with the custom server
+ checker = TokenChecker(server=server)
+ # assert the checker *cannot* verify the creds
+ creds = UsernamePassword('user', '')
+ with self.assertRaises(UnauthorizedLogin):
+ yield checker.requestAvatarId(creds)
+
+
+from twisted.web.test import test_httpauth
+
+
+class TokenCredentialFactoryTestcase(
+ test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
+ unittest.TestCase):
+
+ def setUp(self):
+ test_httpauth.BasicAuthTestsMixin.setUp(self)
+ self.credentialFactory = TokenCredentialFactory()