diff options
author | drebs <drebs@leap.se> | 2016-12-18 16:36:39 -0200 |
---|---|---|
committer | Kali Kaneko <kali@leap.se> | 2017-02-09 17:41:35 +0100 |
commit | 260805b9967184841c4499f94713a9a48c49a813 (patch) | |
tree | 043914710bfbc29f8d7bc7668b38ca3a546e8c86 | |
parent | e73d36621052a69aae327200c063ac1689bcf9e0 (diff) |
[feat] use twisted web http auth and creds
-rw-r--r-- | common/src/leap/soledad/common/couch/state.py | 46 | ||||
-rw-r--r-- | server/pkg/soledad-server | 2 | ||||
-rw-r--r-- | server/src/leap/soledad/server/application.py | 16 | ||||
-rw-r--r-- | server/src/leap/soledad/server/auth.py | 384 | ||||
-rw-r--r-- | server/src/leap/soledad/server/resource.py | 4 | ||||
-rw-r--r-- | server/src/leap/soledad/server/session.py | 78 | ||||
-rw-r--r-- | testing/test_soledad/util.py | 4 | ||||
-rw-r--r-- | testing/tests/client/test_http_client.py | 3 | ||||
-rw-r--r-- | testing/tests/server/test_server.py | 164 |
9 files changed, 239 insertions, 462 deletions
diff --git a/common/src/leap/soledad/common/couch/state.py b/common/src/leap/soledad/common/couch/state.py index a7f5b7b6..a4841d0d 100644 --- a/common/src/leap/soledad/common/couch/state.py +++ b/common/src/leap/soledad/common/couch/state.py @@ -19,13 +19,10 @@ Server state using CouchDatabase as backend. """ import couchdb import re -import time from urlparse import urljoin -from hashlib import sha512 from leap.soledad.common.log import getLogger from leap.soledad.common.couch import CouchDatabase -from leap.soledad.common.couch import couch_server from leap.soledad.common.couch import CONFIG_DOC_ID from leap.soledad.common.couch import SCHEMA_VERSION from leap.soledad.common.couch import SCHEMA_VERSION_KEY @@ -59,12 +56,6 @@ class CouchServerState(ServerState): Inteface of the WSGI server with the CouchDB backend. """ - TOKENS_DB_PREFIX = "tokens_" - TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds - TOKENS_TYPE_KEY = "type" - TOKENS_TYPE_DEF = "Token" - TOKENS_USER_ID_KEY = "user_id" - def __init__(self, couch_url, create_cmd=None, check_schema_versions=False): """ @@ -164,40 +155,3 @@ class CouchServerState(ServerState): delete databases. """ raise Unauthorized() - - def verify_token(self, uuid, token): - """ - Query couchdb to decide if C{token} is valid for C{uuid}. - - @param uuid: The user uuid. - @type uuid: str - @param token: The token. - @type token: str - """ - with couch_server(self.couch_url) as server: - # the tokens db rotates every 30 days, and the current db name is - # "tokens_NNN", where NNN is the number of seconds since epoch - # divide dby the rotate period in seconds. When rotating, old and - # new tokens db coexist during a certain window of time and valid - # tokens are replicated from the old db to the new one. See: - # https://leap.se/code/issues/6785 - dbname = self._tokens_dbname() - db = server[dbname] - # lookup key is a hash of the token to prevent timing attacks. - token = db.get(sha512(token).hexdigest()) - if token is None: - return False - # we compare uuid hashes to avoid possible timing attacks that - # might exploit python's builtin comparison operator behaviour, - # which fails immediatelly when non-matching bytes are found. - couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest() - req_uuid_hash = sha512(uuid).digest() - if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ - or couch_uuid_hash != req_uuid_hash: - return False - return True - - def _tokens_dbname(self): - dbname = self.TOKENS_DB_PREFIX + \ - str(int(time.time() / self.TOKENS_DB_EXPIRE)) - return dbname diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server index 753a260b..92560fa8 100644 --- a/server/pkg/soledad-server +++ b/server/pkg/soledad-server @@ -11,7 +11,7 @@ PATH=/sbin:/bin:/usr/sbin:/usr/bin PIDFILE=/var/run/soledad.pid -RESOURCE_CLASS=leap.soledad.server.resource.SoledadResource +RESOURCE_CLASS=leap.soledad.server.session.SoledadSession HTTPS_PORT=2424 CONFDIR=/etc/soledad CERT_PATH="${CONFDIR}/soledad-server.pem" diff --git a/server/src/leap/soledad/server/application.py b/server/src/leap/soledad/server/application.py index 17296425..8fd343b3 100644 --- a/server/src/leap/soledad/server/application.py +++ b/server/src/leap/soledad/server/application.py @@ -24,7 +24,6 @@ Use it like this: from twisted.internet import reactor from leap.soledad.server import SoledadApp -from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.server.gzip_middleware import GzipMiddleware from leap.soledad.server.config import load_configuration from leap.soledad.common.backend import SoledadBackend @@ -35,20 +34,25 @@ from leap.soledad.common.log import getLogger __all__ = ['wsgi_application'] -def _load_config(): - conf = load_configuration('/etc/soledad/soledad-server.conf') - return conf['soledad-server'] +_config = None + + +def get_config(): + global _config + if not _config: + _config = load_configuration('/etc/soledad/soledad-server.conf') + return _config['soledad-server'] def _get_couch_state(): - conf = _load_config() + conf = get_config() state = CouchServerState(conf['couch_url'], create_cmd=conf['create_cmd'], check_schema_versions=True) SoledadBackend.BATCH_SUPPORT = conf.get('batching', False) return state -_app = SoledadTokenAuthMiddleware(SoledadApp(None)) # delay state init +_app = SoledadApp(None) # delay state init wsgi_application = GzipMiddleware(_app) diff --git a/server/src/leap/soledad/server/auth.py b/server/src/leap/soledad/server/auth.py index c026a282..f55b710e 100644 --- a/server/src/leap/soledad/server/auth.py +++ b/server/src/leap/soledad/server/auth.py @@ -15,20 +15,110 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ -Authentication facilities for Soledad Server. +Twisted http token auth. """ -import httplib -import json +import binascii +import time -from abc import ABCMeta, abstractmethod +from hashlib import sha512 from routes.mapper import Mapper +from zope.interface import implementer + +from twisted.cred import error +from twisted.cred.checkers import ICredentialsChecker +from twisted.cred.credentials import IUsernamePassword +from twisted.cred.credentials import UsernamePassword +from twisted.cred.portal import IRealm +from twisted.cred.portal import Portal +from twisted.web.iweb import ICredentialFactory +from twisted.web.resource import IResource -from leap.soledad.common.log import getLogger -from leap.soledad.common.l2db import DBNAME_CONSTRAINTS, errors as u1db_errors from leap.soledad.common import SHARED_DB_NAME +from leap.soledad.common.couch import couch_server +from leap.soledad.common.l2db import DBNAME_CONSTRAINTS +from leap.soledad.server.resource import SoledadResource +from leap.soledad.server.application import get_config + + +@implementer(IRealm) +class SoledadRealm(object): + + def requestAvatar(self, avatarId, mind, *interfaces): + if IResource in interfaces: + return (IResource, SoledadResource(avatarId), lambda: None) + raise NotImplementedError() + + +@implementer(ICredentialsChecker) +class TokenChecker(object): + + credentialInterfaces = [IUsernamePassword] + + TOKENS_DB_PREFIX = "tokens_" + TOKENS_DB_EXPIRE = 30 * 24 * 3600 # 30 days in seconds + TOKENS_TYPE_KEY = "type" + TOKENS_TYPE_DEF = "Token" + TOKENS_USER_ID_KEY = "user_id" + + def __init__(self): + config = get_config() + self.couch_url = config['couch_url'] + + def _tokens_dbname(self): + dbname = self.TOKENS_DB_PREFIX + \ + str(int(time.time() / self.TOKENS_DB_EXPIRE)) + return dbname + + def requestAvatarId(self, credentials): + uuid = credentials.username + token = credentials.password + with couch_server(self.couch_url) as server: + # the tokens db rotates every 30 days, and the current db name is + # "tokens_NNN", where NNN is the number of seconds since epoch + # divide dby the rotate period in seconds. When rotating, old and + # new tokens db coexist during a certain window of time and valid + # tokens are replicated from the old db to the new one. See: + # https://leap.se/code/issues/6785 + dbname = self._tokens_dbname() + db = server[dbname] + # lookup key is a hash of the token to prevent timing attacks. + token = db.get(sha512(token).hexdigest()) + if token is None: + return False + # we compare uuid hashes to avoid possible timing attacks that + # might exploit python's builtin comparison operator behaviour, + # which fails immediatelly when non-matching bytes are found. + couch_uuid_hash = sha512(token[self.TOKENS_USER_ID_KEY]).digest() + req_uuid_hash = sha512(uuid).digest() + if token[self.TOKENS_TYPE_KEY] != self.TOKENS_TYPE_DEF \ + or couch_uuid_hash != req_uuid_hash: + return False + return True -logger = getLogger(__name__) +@implementer(ICredentialFactory) +class TokenCredentialFactory(object): + + scheme = 'token' + + def getChallenge(self, request): + return {} + + def decode(self, response, request): + try: + creds = response.decode('base64') + except binascii.Error: + raise error.LoginFailed('Invalid credentials') + + creds = creds.split(b':', 1) + if len(creds) == 2: + return UsernamePassword(*creds) + else: + raise error.LoginFailed('Invalid credentials') + + +portal = Portal(SoledadRealm(), [TokenChecker()]) +credentialFactory = TokenCredentialFactory() class URLMapper(object): @@ -41,7 +131,8 @@ class URLMapper(object): self._connect_urls() self._map.create_regs() - def match(self, environ): + def match(self, path, method): + environ = {'PATH_INFO': path, 'REQUEST_METHOD': method} return self._map.match(environ=environ) def _connect(self, pattern, http_methods): @@ -81,272 +172,15 @@ class URLMapper(object): ['GET', 'PUT', 'POST']) -class SoledadAuthMiddleware(object): - """ - Soledad Authentication WSGI middleware. - - This class must be extended to implement specific authentication methods - (see SoledadTokenAuthMiddleware below). - - It expects an HTTP_AUTHORIZATION header containing the concatenation of - the following strings: - - 1. The authentication scheme. It will be verified by the - _verify_authentication_scheme() method. - - 2. A space character. - - 3. The base64 encoded string of the concatenation of the user uuid with - the authentication data, separated by a collon, like this: - - base64("<uuid>:<auth_data>") - - After authentication check, the class performs an authorization check to - verify whether the user is authorized to perform the requested action. - - On client-side, 2 methods must be implemented so the soledad client knows - how to send authentication headers to server: - - * set_<method>_credentials: store authentication credentials in the - class. - - * _sign_request: format and include custom authentication data in - the HTTP_AUTHORIZATION header. - - See leap.soledad.auth and u1db.remote.http_client.HTTPClient to understand - how to do it. - """ - - __metaclass__ = ABCMeta - - HTTP_AUTH_KEY = "HTTP_AUTHORIZATION" - PATH_INFO_KEY = "PATH_INFO" - - CONTENT_TYPE_JSON = ('content-type', 'application/json') - - def __init__(self, app): - """ - Initialize the Soledad Authentication Middleware. - - @param app: The application to run on successfull authentication. - @type app: u1db.remote.http_app.HTTPApp - @param prefix: Auth app path prefix. - @type prefix: str - """ - self._app = app - self._mapper = URLMapper() - - def _error(self, start_response, status, description, message=None): - """ - Send a JSON serialized error to WSGI client. - - @param start_response: Callable of the form start_response(status, - response_headers, exc_info=None). - @type start_response: callable - @param status: Status string of the form "999 Message here" - @type status: str - @param response_headers: A list of (header_name, header_value) tuples - describing the HTTP response header. - @type response_headers: list - @param description: The error description. - @type description: str - @param message: The error message. - @type message: str - - @return: List with JSON serialized error message. - @rtype list - """ - start_response("%d %s" % (status, httplib.responses[status]), - [self.CONTENT_TYPE_JSON]) - err = {"error": description} - if message: - err['message'] = message - return [json.dumps(err)] - - def _unauthorized_error(self, start_response, message): - """ - Send a unauth error. - - @param message: The error message. - @type message: str - @param start_response: Callable of the form start_response(status, - response_headers, exc_info=None). - @type start_response: callable - - @return: List with JSON serialized error message. - @rtype list - """ - return self._error( - start_response, - 401, - "unauthorized", - message) - - def __call__(self, environ, start_response): - """ - Handle a WSGI call to the authentication application. - - @param environ: Dictionary containing CGI variables. - @type environ: dict - @param start_response: Callable of the form start_response(status, - response_headers, exc_info=None). - @type start_response: callable +@implementer(IResource) +class UnauthorizedResource(object): + isLeaf = True - @return: Target application results if authentication succeeds, an - error message otherwise. - @rtype: list - """ - # check for authentication header - auth = environ.get(self.HTTP_AUTH_KEY) - if not auth: - return self._unauthorized_error( - start_response, "Missing authentication header.") - - # get authentication data - scheme, encoded = auth.split(None, 1) - uuid, auth_data = encoded.decode('base64').split(':', 1) - if not self._verify_authentication_scheme(scheme): - return self._unauthorized_error( - start_response, "Wrong authentication scheme") - - # verify if user is athenticated - try: - if not self._verify_authentication_data(uuid, auth_data): - return self._unauthorized_error( - start_response, - self._get_auth_error_string()) - except u1db_errors.Unauthorized as e: - return self._error( - start_response, - 401, - e.wire_description) - - # verify if user is authorized to perform action - if not self._verify_authorization(environ, uuid): - return self._unauthorized_error( - start_response, - "Unauthorized action.") - - # move on to the real Soledad app - del environ[self.HTTP_AUTH_KEY] - return self._app(environ, start_response) - - @abstractmethod - def _verify_authentication_scheme(self, scheme): - """ - Verify if authentication scheme is valid. - - @param scheme: Auth scheme extracted from the HTTP_AUTHORIZATION - header. - @type scheme: str - - @return: Whether the authentitcation scheme is valid. - """ - return None - - @abstractmethod - def _verify_authentication_data(self, uuid, auth_data): - """ - Verify valid authenticatiion for this request. - - @param uuid: The user's uuid. - @type uuid: str - @param auth_data: Authentication data. - @type auth_data: str - - @return: Whether the token is valid for authenticating the request. - @rtype: bool - - @raise Unauthorized: Raised when C{auth_data} is not enough to - authenticate C{uuid}. - """ - return None + def render(self, request): + request.setResponseCode(401) + if request.method == b'HEAD': + return b'' + return b'Unauthorized' - def _verify_authorization(self, environ, uuid): - """ - Verify if the user is authorized to perform the requested action over - the requested database. - - @param environ: Dictionary containing CGI variables. - @type environ: dict - @param uuid: The user's uuid from the Authorization header. - @type uuid: str - - @return: Whether the user is authorized to perform the requested action - over the requested db. - @rtype: bool - """ - match = self._mapper.match(environ) - if not match: - return False - return uuid == match.get('uuid') - - @abstractmethod - def _get_auth_error_string(self): - """ - Return an error string specific for each kind of authentication method. - - @return: The error string. - """ - return None - - -class SoledadTokenAuthMiddleware(SoledadAuthMiddleware): - """ - Token based authentication. - """ - - TOKEN_AUTH_ERROR_STRING = "Incorrect address or token." - - def _get_state(self): - return self._app.state - - def _set_state(self, state): - self._app.state = state - - state = property(_get_state, _set_state) - - def _verify_authentication_scheme(self, scheme): - """ - Verify if authentication scheme is valid. - - @param scheme: Auth scheme extracted from the HTTP_AUTHORIZATION - header. - @type scheme: str - - @return: Whether the authentitcation scheme is valid. - """ - if scheme.lower() != 'token': - return False - return True - - def _verify_authentication_data(self, uuid, auth_data): - """ - Extract token from C{auth_data} and proceed with verification of - C{uuid} authentication. - - @param uuid: The user UID. - @type uuid: str - @param auth_data: Authentication data (i.e. the token). - @type auth_data: str - - @return: Whether the token is valid for authenticating the request. - @rtype: bool - - @raise Unauthorized: Raised when C{auth_data} is not enough to - authenticate C{uuid}. - """ - token = auth_data # we expect a cleartext token at this point - try: - return self.state.verify_token(uuid, token) - except Exception as e: - logger.error(e) - return False - - def _get_auth_error_string(self): - """ - Get the error string for token auth. - - @return: The error string. - """ - return self.TOKEN_AUTH_ERROR_STRING + def getChildWithDefault(self, path, request): + return self diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py index dbb91b0a..9922c997 100644 --- a/server/src/leap/soledad/server/resource.py +++ b/server/src/leap/soledad/server/resource.py @@ -17,7 +17,6 @@ """ A twisted resource that serves the Soledad Server. """ - from twisted.web.resource import Resource from twisted.web.wsgi import WSGIResource from twisted.internet import reactor @@ -42,7 +41,8 @@ class SoledadResource(Resource): for the Soledad Server. """ - def __init__(self): + def __init__(self, uuid): + self._uuid = uuid self.children = {'': wsgi_resource} def getChild(self, path, request): diff --git a/server/src/leap/soledad/server/session.py b/server/src/leap/soledad/server/session.py new file mode 100644 index 00000000..22e1d1fb --- /dev/null +++ b/server/src/leap/soledad/server/session.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# session.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Twisted resource containing an authenticated Soledad session. +""" +from zope.interface import implementer + +from twisted.cred import error +from twisted.python import log +from twisted.web import util +from twisted.web.guard import HTTPAuthSessionWrapper +from twisted.web.resource import ErrorPage +from twisted.web.resource import IResource + +from leap.soledad.server.auth import URLMapper +from leap.soledad.server.auth import portal +from leap.soledad.server.auth import credentialFactory +from leap.soledad.server.auth import UnauthorizedResource + + +@implementer(IResource) +class SoledadSession(HTTPAuthSessionWrapper): + + def __init__(self): + self._mapper = URLMapper() + self._portal = portal + self._credentialFactory = credentialFactory + + def _matchPath(self, request): + match = self._mapper.match(request.path, request.method) + return match + + def _parseHeader(self, header): + elements = header.split(b' ') + scheme = elements[0].lower() + if scheme == self._credentialFactory.scheme: + return (b' '.join(elements[1:])) + return None + + def _authorizedResource(self, request): + match = self._matchPath(request) + if not match: + return UnauthorizedResource() + + header = request.getHeader(b'authorization') + if not header: + return UnauthorizedResource() + + auth_data = self._parseHeader(header) + if not auth_data: + return UnauthorizedResource() + + try: + credentials = self._credentialFactory.decode(auth_data, request) + except error.LoginFailed: + return UnauthorizedResource() + except: + log.err(None, "Unexpected failure from credentials factory") + return ErrorPage(500, None, None) + else: + request_uuid = match.get('uuid') + if request_uuid and request_uuid != credentials.username: + return ErrorPage(500, None, None) + return util.DeferredResource(self._login(credentials)) diff --git a/testing/test_soledad/util.py b/testing/test_soledad/util.py index 57f8199b..e44a165d 100644 --- a/testing/test_soledad/util.py +++ b/testing/test_soledad/util.py @@ -52,7 +52,6 @@ from leap.soledad.client.sqlcipher import SQLCipherOptions from leap.soledad.client._crypto import is_symmetrically_encrypted from leap.soledad.server import SoledadApp -from leap.soledad.server.auth import SoledadTokenAuthMiddleware PASSWORD = '123456' @@ -108,7 +107,7 @@ def make_soledad_app(state): def make_token_soledad_app(state): - app = SoledadApp(state) + application = SoledadApp(state) def _verify_authentication_data(uuid, auth_data): if uuid.startswith('user-') and auth_data == 'auth-token': @@ -119,7 +118,6 @@ def make_token_soledad_app(state): def _verify_authorization(uuid, environ): return True - application = SoledadTokenAuthMiddleware(app) application._verify_authentication_data = _verify_authentication_data application._verify_authorization = _verify_authorization return application diff --git a/testing/tests/client/test_http_client.py b/testing/tests/client/test_http_client.py index a107930a..691c7576 100644 --- a/testing/tests/client/test_http_client.py +++ b/testing/tests/client/test_http_client.py @@ -24,7 +24,6 @@ from testscenarios import TestWithScenarios from leap.soledad.client import auth from leap.soledad.common.l2db.remote import http_client from test_soledad.u1db_tests import test_http_client -from leap.soledad.server.auth import SoledadTokenAuthMiddleware # ----------------------------------------------------------------------------- @@ -67,7 +66,7 @@ class TestSoledadClientBase( return res # mime solead application here. if '/token' in environ['PATH_INFO']: - auth = environ.get(SoledadTokenAuthMiddleware.HTTP_AUTH_KEY) + auth = environ.get('HTTP_AUTHORIZATION') if not auth: start_response("401 Unauthorized", [('Content-Type', 'application/json')]) diff --git a/testing/tests/server/test_server.py b/testing/tests/server/test_server.py index 09242736..12f6fb20 100644 --- a/testing/tests/server/test_server.py +++ b/testing/tests/server/test_server.py @@ -18,11 +18,9 @@ Tests for server-related functionality. """ import binascii -import mock import os import pytest -from hashlib import sha512 from pkg_resources import resource_filename from urlparse import urljoin from uuid import uuid4 @@ -46,36 +44,6 @@ from leap.soledad.client import Soledad from leap.soledad.server.config import load_configuration from leap.soledad.server.config import CONFIG_DEFAULTS from leap.soledad.server.auth import URLMapper -from leap.soledad.server.auth import SoledadTokenAuthMiddleware - - -class ServerAuthenticationMiddlewareTestCase(CouchDBTestCase): - - def setUp(self): - super(ServerAuthenticationMiddlewareTestCase, self).setUp() - app = mock.Mock() - self._state = CouchServerState(self.couch_url) - app.state = self._state - self.auth_middleware = SoledadTokenAuthMiddleware(app) - self._authorize('valid-uuid', 'valid-token') - - def _authorize(self, uuid, token): - token_doc = {} - token_doc['_id'] = sha512(token).hexdigest() - token_doc[self._state.TOKENS_USER_ID_KEY] = uuid - token_doc[self._state.TOKENS_TYPE_KEY] = \ - self._state.TOKENS_TYPE_DEF - dbname = self._state._tokens_dbname() - db = self.couch_server.create(dbname) - db.save(token_doc) - self.addCleanup(self.delete_db, db.name) - - def test_authorized_user(self): - is_authorized = self.auth_middleware._verify_authentication_data - self.assertTrue(is_authorized('valid-uuid', 'valid-token')) - self.assertFalse(is_authorized('valid-uuid', 'invalid-token')) - self.assertFalse(is_authorized('invalid-uuid', 'valid-token')) - self.assertFalse(is_authorized('eve', 'invalid-token')) class ServerAuthorizationTestCase(BaseSoledadTest): @@ -90,12 +58,6 @@ class ServerAuthorizationTestCase(BaseSoledadTest): def tearDown(self): pass - def _make_environ(self, path_info, request_method): - return { - 'PATH_INFO': path_info, - 'REQUEST_METHOD': request_method, - } - def test_verify_action_with_correct_dbnames(self): """ Test encrypting and decrypting documents. @@ -120,146 +82,94 @@ class ServerAuthorizationTestCase(BaseSoledadTest): dbname = 'user-%s' % uuid # test global auth - match = urlmap.match(self._make_environ('/', 'GET')) + match = urlmap.match('/', 'GET') + self.assertIsNotNone(match) # test shared-db database resource auth - match = urlmap.match( - self._make_environ('/shared', 'GET')) + match = urlmap.match('/shared', 'GET') self.assertIsNotNone(match) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared', 'PUT'))) + match = urlmap.match('/shared', 'PUT') + self.assertIsNone(match) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared', 'DELETE'))) + match = urlmap.match('/shared', 'DELETE') + self.assertIsNone(match) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared', 'POST'))) + match = urlmap.match('/shared', 'POST') + self.assertIsNone(match) # test shared-db docs resource auth - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/docs', 'GET'))) + self.assertIsNone(urlmap.match('/shared/docs', 'GET')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/docs', 'PUT'))) + self.assertIsNone(urlmap.match('/shared/docs', 'PUT')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/docs', 'DELETE'))) + self.assertIsNone(urlmap.match('/shared/docs', 'DELETE')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/docs', 'POST'))) + self.assertIsNone(urlmap.match('/shared/docs', 'POST')) # test shared-db doc resource auth - match = urlmap.match( - self._make_environ('/shared/doc/x', 'GET')) + match = urlmap.match('/shared/doc/x', 'GET') self.assertIsNotNone(match) self.assertEqual('x', match.get('id')) - match = urlmap.match( - self._make_environ('/shared/doc/x', 'PUT')) + match = urlmap.match('/shared/doc/x', 'PUT') self.assertIsNotNone(match) self.assertEqual('x', match.get('id')) - match = urlmap.match( - self._make_environ('/shared/doc/x', 'DELETE')) + match = urlmap.match('/shared/doc/x', 'DELETE') self.assertEqual('x', match.get('id')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/doc/x', 'POST'))) + self.assertIsNone(urlmap.match('/shared/doc/x', 'POST')) # test shared-db sync resource auth - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/sync-from/x', 'GET'))) + self.assertIsNone(urlmap.match('/shared/sync-from/x', 'GET')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/sync-from/x', 'PUT'))) + self.assertIsNone(urlmap.match('/shared/sync-from/x', 'PUT')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/sync-from/x', 'DELETE'))) + self.assertIsNone(urlmap.match('/shared/sync-from/x', 'DELETE')) - self.assertIsNone( - urlmap.match( - self._make_environ('/shared/sync-from/x', 'POST'))) + self.assertIsNone(urlmap.match('/shared/sync-from/x', 'POST')) # test user-db database resource auth - self.assertIsNone( - urlmap.match( - self._make_environ('/%s' % dbname, 'GET'))) + self.assertIsNone(urlmap.match('/%s' % dbname, 'GET')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s' % dbname, 'PUT'))) + self.assertIsNone(urlmap.match('/%s' % dbname, 'PUT')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s' % dbname, 'DELETE'))) + self.assertIsNone(urlmap.match('/%s' % dbname, 'DELETE')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s' % dbname, 'POST'))) + self.assertIsNone(urlmap.match('/%s' % dbname, 'POST')) # test user-db docs resource auth - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/docs' % dbname, 'GET'))) + self.assertIsNone(urlmap.match('/%s/docs' % dbname, 'GET')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/docs' % dbname, 'PUT'))) + self.assertIsNone(urlmap.match('/%s/docs' % dbname, 'PUT')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/docs' % dbname, 'DELETE'))) + self.assertIsNone(urlmap.match('/%s/docs' % dbname, 'DELETE')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/docs' % dbname, 'POST'))) + self.assertIsNone(urlmap.match('/%s/docs' % dbname, 'POST')) # test user-db doc resource auth - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/doc/x' % dbname, 'GET'))) + self.assertIsNone(urlmap.match('/%s/doc/x' % dbname, 'GET')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/doc/x' % dbname, 'PUT'))) + self.assertIsNone(urlmap.match('/%s/doc/x' % dbname, 'PUT')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/doc/x' % dbname, 'DELETE'))) + self.assertIsNone(urlmap.match('/%s/doc/x' % dbname, 'DELETE')) - self.assertIsNone( - urlmap.match( - self._make_environ('/%s/doc/x' % dbname, 'POST'))) + self.assertIsNone(urlmap.match('/%s/doc/x' % dbname, 'POST')) # test user-db sync resource auth - match = urlmap.match( - self._make_environ('/%s/sync-from/x' % dbname, 'GET')) + match = urlmap.match('/%s/sync-from/x' % dbname, 'GET') self.assertEqual(uuid, match.get('uuid')) self.assertEqual('x', match.get('source_replica_uid')) - match = urlmap.match( - self._make_environ('/%s/sync-from/x' % dbname, 'PUT')) + match = urlmap.match('/%s/sync-from/x' % dbname, 'PUT') self.assertEqual(uuid, match.get('uuid')) self.assertEqual('x', match.get('source_replica_uid')) - match = urlmap.match( - self._make_environ('/%s/sync-from/x' % dbname, 'DELETE')) + match = urlmap.match('/%s/sync-from/x' % dbname, 'DELETE') self.assertIsNone(match) - match = urlmap.match( - self._make_environ('/%s/sync-from/x' % dbname, 'POST')) + match = urlmap.match('/%s/sync-from/x' % dbname, 'POST') self.assertEqual(uuid, match.get('uuid')) self.assertEqual('x', match.get('source_replica_uid')) |