summaryrefslogtreecommitdiff
path: root/u1db/tests/test_auth_middleware.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-06 11:07:53 -0200
committerdrebs <drebs@leap.se>2012-12-06 11:07:53 -0200
commit30c9ec2a115b157e608e97c9b8449fc9c753b60f (patch)
treee36dfed8ef7cd0beae446909eef22b662cb92554 /u1db/tests/test_auth_middleware.py
parent08fd87714f503c551b8e2dc29e55ae4497b42759 (diff)
Remove u1db and swiftclient dirs and refactor.
Diffstat (limited to 'u1db/tests/test_auth_middleware.py')
-rw-r--r--u1db/tests/test_auth_middleware.py309
1 files changed, 0 insertions, 309 deletions
diff --git a/u1db/tests/test_auth_middleware.py b/u1db/tests/test_auth_middleware.py
deleted file mode 100644
index e765f8a7..00000000
--- a/u1db/tests/test_auth_middleware.py
+++ /dev/null
@@ -1,309 +0,0 @@
-# Copyright 2012 Canonical Ltd.
-#
-# This file is part of u1db.
-#
-# u1db is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation.
-#
-# u1db is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with u1db. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test OAuth wsgi middleware"""
-import paste.fixture
-from oauth import oauth
-try:
- import simplejson as json
-except ImportError:
- import json # noqa
-import time
-
-from u1db import tests
-
-from u1db.remote.oauth_middleware import OAuthMiddleware
-from u1db.remote.basic_auth_middleware import BasicAuthMiddleware, Unauthorized
-
-
-BASE_URL = 'https://example.net'
-
-
-class TestBasicAuthMiddleware(tests.TestCase):
-
- def setUp(self):
- super(TestBasicAuthMiddleware, self).setUp()
- self.got = []
-
- def witness_app(environ, start_response):
- start_response("200 OK", [("content-type", "text/plain")])
- self.got.append((
- environ['user_id'], environ['PATH_INFO'],
- environ['QUERY_STRING']))
- return ["ok"]
-
- class MyAuthMiddleware(BasicAuthMiddleware):
-
- def verify_user(self, environ, user, password):
- if user != "correct_user":
- raise Unauthorized
- if password != "correct_password":
- raise Unauthorized
- environ['user_id'] = user
-
- self.auth_midw = MyAuthMiddleware(witness_app, prefix="/pfx/")
- self.app = paste.fixture.TestApp(self.auth_midw)
-
- def test_expect_prefix(self):
- url = BASE_URL + '/foo/doc/doc-id'
- resp = self.app.delete(url, expect_errors=True)
- self.assertEqual(400, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual('{"error": "bad request"}', resp.body)
-
- def test_missing_auth(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- resp = self.app.delete(url, expect_errors=True)
- self.assertEqual(401, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual(
- {"error": "unauthorized",
- "message": "Missing Basic Authentication."},
- json.loads(resp.body))
-
- def test_correct_auth(self):
- user = "correct_user"
- password = "correct_password"
- params = {'old_rev': 'old-rev'}
- url = BASE_URL + '/pfx/foo/doc/doc-id?%s' % (
- '&'.join("%s=%s" % (k, v) for k, v in params.items()))
- auth = '%s:%s' % (user, password)
- headers = {
- 'Authorization': 'Basic %s' % (auth.encode('base64'),)}
- resp = self.app.delete(url, headers=headers)
- self.assertEqual(200, resp.status)
- self.assertEqual(
- [('correct_user', '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
- def test_incorrect_auth(self):
- user = "correct_user"
- password = "incorrect_password"
- params = {'old_rev': 'old-rev'}
- url = BASE_URL + '/pfx/foo/doc/doc-id?%s' % (
- '&'.join("%s=%s" % (k, v) for k, v in params.items()))
- auth = '%s:%s' % (user, password)
- headers = {
- 'Authorization': 'Basic %s' % (auth.encode('base64'),)}
- resp = self.app.delete(url, headers=headers, expect_errors=True)
- self.assertEqual(401, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual(
- {"error": "unauthorized",
- "message": "Incorrect password or login."},
- json.loads(resp.body))
-
-
-class TestOAuthMiddlewareDefaultPrefix(tests.TestCase):
- def setUp(self):
-
- super(TestOAuthMiddlewareDefaultPrefix, self).setUp()
- self.got = []
-
- def witness_app(environ, start_response):
- start_response("200 OK", [("content-type", "text/plain")])
- self.got.append((environ['token_key'], environ['PATH_INFO'],
- environ['QUERY_STRING']))
- return ["ok"]
-
- class MyOAuthMiddleware(OAuthMiddleware):
- get_oauth_data_store = lambda self: tests.testingOAuthStore
-
- def verify(self, environ, oauth_req):
- consumer, token = super(MyOAuthMiddleware, self).verify(
- environ, oauth_req)
- environ['token_key'] = token.key
-
- self.oauth_midw = MyOAuthMiddleware(witness_app, BASE_URL)
- self.app = paste.fixture.TestApp(self.oauth_midw)
-
- def test_expect_tilde(self):
- url = BASE_URL + '/foo/doc/doc-id'
- resp = self.app.delete(url, expect_errors=True)
- self.assertEqual(400, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual('{"error": "bad request"}', resp.body)
-
- def test_oauth_in_header(self):
- url = BASE_URL + '/~/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer2,
- tests.token2,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- url = oauth_req.get_normalized_http_url() + '?' + (
- '&'.join("%s=%s" % (k, v) for k, v in params.items()))
- oauth_req.sign_request(tests.sign_meth_HMAC_SHA1,
- tests.consumer2, tests.token2)
- resp = self.app.delete(url, headers=oauth_req.to_header())
- self.assertEqual(200, resp.status)
- self.assertEqual([(tests.token2.key,
- '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
- def test_oauth_in_query_string(self):
- url = BASE_URL + '/~/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer1,
- tests.token1,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- oauth_req.sign_request(tests.sign_meth_HMAC_SHA1,
- tests.consumer1, tests.token1)
- resp = self.app.delete(oauth_req.to_url())
- self.assertEqual(200, resp.status)
- self.assertEqual([(tests.token1.key,
- '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
-
-class TestOAuthMiddleware(tests.TestCase):
-
- def setUp(self):
- super(TestOAuthMiddleware, self).setUp()
- self.got = []
-
- def witness_app(environ, start_response):
- start_response("200 OK", [("content-type", "text/plain")])
- self.got.append((environ['token_key'], environ['PATH_INFO'],
- environ['QUERY_STRING']))
- return ["ok"]
-
- class MyOAuthMiddleware(OAuthMiddleware):
- get_oauth_data_store = lambda self: tests.testingOAuthStore
-
- def verify(self, environ, oauth_req):
- consumer, token = super(MyOAuthMiddleware, self).verify(
- environ, oauth_req)
- environ['token_key'] = token.key
-
- self.oauth_midw = MyOAuthMiddleware(
- witness_app, BASE_URL, prefix='/pfx/')
- self.app = paste.fixture.TestApp(self.oauth_midw)
-
- def test_expect_prefix(self):
- url = BASE_URL + '/foo/doc/doc-id'
- resp = self.app.delete(url, expect_errors=True)
- self.assertEqual(400, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual('{"error": "bad request"}', resp.body)
-
- def test_missing_oauth(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- resp = self.app.delete(url, expect_errors=True)
- self.assertEqual(401, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- self.assertEqual(
- {"error": "unauthorized", "message": "Missing OAuth."},
- json.loads(resp.body))
-
- def test_oauth_in_query_string(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer1,
- tests.token1,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- oauth_req.sign_request(tests.sign_meth_HMAC_SHA1,
- tests.consumer1, tests.token1)
- resp = self.app.delete(oauth_req.to_url())
- self.assertEqual(200, resp.status)
- self.assertEqual([(tests.token1.key,
- '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
- def test_oauth_invalid(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer1,
- tests.token3,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- oauth_req.sign_request(tests.sign_meth_HMAC_SHA1,
- tests.consumer1, tests.token3)
- resp = self.app.delete(oauth_req.to_url(),
- expect_errors=True)
- self.assertEqual(401, resp.status)
- self.assertEqual('application/json', resp.header('content-type'))
- err = json.loads(resp.body)
- self.assertEqual({"error": "unauthorized",
- "message": err['message']},
- err)
-
- def test_oauth_in_header(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer2,
- tests.token2,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- url = oauth_req.get_normalized_http_url() + '?' + (
- '&'.join("%s=%s" % (k, v) for k, v in params.items()))
- oauth_req.sign_request(tests.sign_meth_HMAC_SHA1,
- tests.consumer2, tests.token2)
- resp = self.app.delete(url, headers=oauth_req.to_header())
- self.assertEqual(200, resp.status)
- self.assertEqual([(tests.token2.key,
- '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
- def test_oauth_plain_text(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer1,
- tests.token1,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- oauth_req.sign_request(tests.sign_meth_PLAINTEXT,
- tests.consumer1, tests.token1)
- resp = self.app.delete(oauth_req.to_url())
- self.assertEqual(200, resp.status)
- self.assertEqual([(tests.token1.key,
- '/foo/doc/doc-id', 'old_rev=old-rev')], self.got)
-
- def test_oauth_timestamp_threshold(self):
- url = BASE_URL + '/pfx/foo/doc/doc-id'
- params = {'old_rev': 'old-rev'}
- oauth_req = oauth.OAuthRequest.from_consumer_and_token(
- tests.consumer1,
- tests.token1,
- parameters=params,
- http_url=url,
- http_method='DELETE'
- )
- oauth_req.set_parameter('oauth_timestamp', int(time.time()) - 5)
- oauth_req.sign_request(tests.sign_meth_PLAINTEXT,
- tests.consumer1, tests.token1)
- # tweak threshold
- self.oauth_midw.timestamp_threshold = 1
- resp = self.app.delete(oauth_req.to_url(), expect_errors=True)
- self.assertEqual(401, resp.status)
- err = json.loads(resp.body)
- self.assertIn('Expired timestamp', err['message'])
- self.assertIn('threshold 1', err['message'])