summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2016-12-22 07:32:29 -0200
committerKali Kaneko <kali@leap.se>2017-02-09 17:41:38 +0100
commitc3c8afb68330bdfb8fe70efc7d055b55ca9c1c3a (patch)
tree984182c0e1598dce38c9f578fa4aa792b38bac6d
parent6043f7966b64d6922987bca9137a524fb06a3379 (diff)
[refactor] remove leftover code from previous wsgi auth
-rw-r--r--common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py66
-rw-r--r--testing/tests/client/test_http_client.py107
2 files changed, 0 insertions, 173 deletions
diff --git a/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py b/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py
deleted file mode 100644
index 96d0d872..00000000
--- a/common/src/leap/soledad/common/l2db/remote/basic_auth_middleware.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright 2012 Canonical Ltd.
-#
-# This file is part of u1db.
-#
-# u1db is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation.
-#
-# u1db is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with u1db. If not, see <http://www.gnu.org/licenses/>.
-"""U1DB Basic Auth authorisation WSGI middleware."""
-import httplib
-import json
-
-from wsgiref.util import shift_path_info
-
-
-class Unauthorized(Exception):
- """User authorization failed."""
-
-
-class BasicAuthMiddleware(object):
- """U1DB Basic Auth Authorisation WSGI middleware."""
-
- def __init__(self, app, prefix):
- self.app = app
- self.prefix = prefix
-
- def _error(self, start_response, status, description, message=None):
- start_response("%d %s" % (status, httplib.responses[status]),
- [('content-type', 'application/json')])
- err = {"error": description}
- if message:
- err['message'] = message
- return [json.dumps(err)]
-
- def __call__(self, environ, start_response):
- if self.prefix and not environ['PATH_INFO'].startswith(self.prefix):
- return self._error(start_response, 400, "bad request")
- auth = environ.get('HTTP_AUTHORIZATION')
- if not auth:
- return self._error(start_response, 401, "unauthorized",
- "Missing Basic Authentication.")
- scheme, encoded = auth.split(None, 1)
- if scheme.lower() != 'basic':
- return self._error(
- start_response, 401, "unauthorized",
- "Missing Basic Authentication")
- user, password = encoded.decode('base64').split(':', 1)
- try:
- self.verify_user(environ, user, password)
- except Unauthorized:
- return self._error(
- start_response, 401, "unauthorized",
- "Incorrect password or login.")
- del environ['HTTP_AUTHORIZATION']
- shift_path_info(environ)
- return self.app(environ, start_response)
-
- def verify_user(self, environ, username, password):
- raise NotImplementedError(self.verify_user)
diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py
deleted file mode 100644
index 691c7576..00000000
--- a/testing/tests/client/test_http_client.py
+++ /dev/null
@@ -1,107 +0,0 @@
-# -*- coding: utf-8 -*-
-# test_http_client.py
-# Copyright (C) 2013-2016 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Test Leap backend bits: sync target
-"""
-import json
-
-from testscenarios import TestWithScenarios
-
-from leap.soledad.client import auth
-from leap.soledad.common.l2db.remote import http_client
-from test_soledad.u1db_tests import test_http_client
-
-
-# -----------------------------------------------------------------------------
-# The following tests come from `u1db.tests.test_http_client`.
-# -----------------------------------------------------------------------------
-
-class TestSoledadClientBase(
- TestWithScenarios,
- test_http_client.TestHTTPClientBase):
-
- """
- This class should be used to test Token auth.
- """
-
- def getClient(self, **kwds):
- cli = self.getClientWithToken(**kwds)
- if 'creds' not in kwds:
- cli.set_token_credentials('user-uuid', 'auth-token')
- return cli
-
- def getClientWithToken(self, **kwds):
- self.startServer()
-
- class _HTTPClientWithToken(
- http_client.HTTPClientBase, auth.TokenBasedAuth):
-
- def set_token_credentials(self, uuid, token):
- auth.TokenBasedAuth.set_token_credentials(self, uuid, token)
-
- def _sign_request(self, method, url_query, params):
- return auth.TokenBasedAuth._sign_request(
- self, method, url_query, params)
-
- return _HTTPClientWithToken(self.getURL('dbase'), **kwds)
-
- def app(self, environ, start_response):
- res = test_http_client.TestHTTPClientBase.app(
- self, environ, start_response)
- if res is not None:
- return res
- # mime solead application here.
- if '/token' in environ['PATH_INFO']:
- auth = environ.get('HTTP_AUTHORIZATION')
- if not auth:
- start_response("401 Unauthorized",
- [('Content-Type', 'application/json')])
- return [
- json.dumps(
- {"error": "unauthorized",
- "message": "no token found in environment"})
- ]
- scheme, encoded = auth.split(None, 1)
- if scheme.lower() != 'token':
- start_response("401 Unauthorized",
- [('Content-Type', 'application/json')])
- return [json.dumps({"error": "unauthorized",
- "message": "unknown scheme: %s" % scheme})]
- uuid, token = encoded.decode('base64').split(':', 1)
- if uuid != 'user-uuid' and token != 'auth-token':
- return Exception("Incorrect address or token.")
- start_response("200 OK", [('Content-Type', 'application/json')])
- return [json.dumps([environ['PATH_INFO'], uuid, token])]
-
- def test_token(self):
- """
- Test if token is sent correctly.
- """
- cli = self.getClientWithToken()
- cli.set_token_credentials('user-uuid', 'auth-token')
- res, headers = cli._request('GET', ['doc', 'token'])
- self.assertEqual(
- ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))
-
- def test_token_ctr_creds(self):
- cli = self.getClientWithToken(creds={'token': {
- 'uuid': 'user-uuid',
- 'token': 'auth-token',
- }})
- res, headers = cli._request('GET', ['doc', 'token'])
- self.assertEqual(
- ['/dbase/doc/token', 'user-uuid', 'auth-token'], json.loads(res))