diff options
author | drebs <drebs@leap.se> | 2016-12-22 07:32:29 -0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2017-02-09 17:41:38 +0100 |
commit | c3c8afb68330bdfb8fe70efc7d055b55ca9c1c3a (patch) | |
tree | 984182c0e1598dce38c9f578fa4aa792b38bac6d | |
parent | 6043f7966b64d6922987bca9137a524fb06a3379 (diff) |
[refactor] remove leftover code from previous wsgi auth
-rw-r--r-- | common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py | 66 | ||||
-rw-r--r-- | testing/tests/client/test_http_client.py | 107 |
2 files changed, 0 insertions, 173 deletions
diff --git a/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py b/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py deleted file mode 100644 index 96d0d872..00000000 --- a/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2012 Canonical Ltd. -# -# This file is part of u1db. -# -# u1db is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License version 3 -# as published by the Free Software Foundation. -# -# u1db is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with u1db. If not, see <http://www.gnu.org/licenses/>. -"""U1DB Basic Auth authorisation WSGI middleware.""" -import httplib -import json - -from wsgiref.util import shift_path_info - - -class Unauthorized(Exception): - """User authorization failed.""" - - -class BasicAuthMiddleware(object): - """U1DB Basic Auth Authorisation WSGI middleware.""" - - def __init__(self, app, prefix): - self.app = app - self.prefix = prefix - - def _error(self, start_response, status, description, message=None): - start_response("%d %s" % (status, httplib.responses[status]), - [('content-type', 'application/json')]) - err = {"error": description} - if message: - err['message'] = message - return [json.dumps(err)] - - def __call__(self, environ, start_response): - if self.prefix and not environ['PATH_INFO'].startswith(self.prefix): - return self._error(start_response, 400, "bad request") - auth = environ.get('HTTP_AUTHORIZATION') - if not auth: - return self._error(start_response, 401, "unauthorized", - "Missing Basic Authentication.") - scheme, encoded = auth.split(None, 1) - if scheme.lower() != 'basic': - return self._error( - start_response, 401, "unauthorized", - "Missing Basic Authentication") - user, password = encoded.decode('base64').split(':', 1) - try: - self.verify_user(environ, user, password) - except Unauthorized: - return self._error( - start_response, 401, "unauthorized", - "Incorrect password or login.") - del environ['HTTP_AUTHORIZATION'] - shift_path_info(environ) - return self.app(environ, start_response) - - def verify_user(self, environ, username, password): - raise NotImplementedError(self.verify_user) diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py deleted file mode 100644 index 691c7576..00000000 --- a/testing/tests/client/test_http_client.py +++ /dev/null @@ -1,107 +0,0 @@ -# -*- coding: utf-8 -*- -# test_http_client.py -# Copyright (C) 2013-2016 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test Leap backend bits: sync target -""" -import json - -from testscenarios import TestWithScenarios - -from leap.soledad.client import auth -from leap.soledad.common.l2db.remote import http_client -from test_soledad.u1db_tests import test_http_client - - -# ----------------------------------------------------------------------------- -# The following tests come from `u1db.tests.test_http_client`. -# ----------------------------------------------------------------------------- - -class TestSoledadClientBase( - TestWithScenarios, - test_http_client.TestHTTPClientBase): - - """ - This class should be used to test Token auth. - """ - - def getClient(self, **kwds): - cli = self.getClientWithToken(**kwds) - if 'creds' not in kwds: - cli.set_token_credentials('user-uuid', 'auth-token') - return cli - - def getClientWithToken(self, **kwds): - self.startServer() - - class _HTTPClientWithToken( - http_client.HTTPClientBase, auth.TokenBasedAuth): - - def set_token_credentials(self, uuid, token): - auth.TokenBasedAuth.set_token_credentials(self, uuid, token) - - def _sign_request(self, method, url_query, params): - return auth.TokenBasedAuth._sign_request( - self, method, url_query, params) - - return _HTTPClientWithToken(self.getURL('dbase'), **kwds) - - def app(self, environ, start_response): - res = test_http_client.TestHTTPClientBase.app( - self, environ, start_response) - if res is not None: - return res - # mime solead application here. - if '/token' in environ['PATH_INFO']: - auth = environ.get('HTTP_AUTHORIZATION') - if not auth: - start_response("401 Unauthorized", - [('Content-Type', 'application/json')]) - return [ - json.dumps( - {"error": "unauthorized", - "message": "no token found in environment"}) - ] - scheme, encoded = auth.split(None, 1) - if scheme.lower() != 'token': - start_response("401 Unauthorized", - [('Content-Type', 'application/json')]) - return [json.dumps({"error": "unauthorized", - "message": "unknown scheme: %s" % scheme})] - uuid, token = encoded.decode('base64').split(':', 1) - if uuid != 'user-uuid' and token != 'auth-token': - return Exception("Incorrect address or token.") - start_response("200 OK", [('Content-Type', 'application/json')]) - return [json.dumps([environ['PATH_INFO'], uuid, token])] - - def test_token(self): - """ - Test if token is sent correctly. - """ - cli = self.getClientWithToken() - cli.set_token_credentials('user-uuid', 'auth-token') - res, headers = cli._request('GET', ['doc', 'token']) - self.assertEqual( - ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) - - def test_token_ctr_creds(self): - cli = self.getClientWithToken(creds={'token': { - 'uuid': 'user-uuid', - 'token': 'auth-token', - }}) - res, headers = cli._request('GET', ['doc', 'token']) - self.assertEqual( - ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res)) |