summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/core/web/_auth.py
blob: 6a5e3621ae4f44f53e61fc94e3daf6d62ddb56f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from zope.interface import implementer

from twisted.cred import portal, checkers, credentials, error as credError
from twisted.internet import defer
from twisted.web.guard import HTTPAuthSessionWrapper, BasicCredentialFactory
from twisted.web.resource import IResource


class TokenCredentialFactory(BasicCredentialFactory):
    scheme = 'token'


@implementer(IResource)
class WhitelistHTTPAuthSessionWrapper(HTTPAuthSessionWrapper):

    """
    Wrap a portal, enforcing supported header-based authentication schemes.
    It doesn't apply the enforcement to routes included in a whitelist.
    """

    # TODO extend this to inspect the data -- so that we pass a tuple
    # with the action

    whitelist = (None,)

    def __init__(self, *args, **kw):
        self.whitelist = kw.pop('whitelist', tuple())
        super(WhitelistHTTPAuthSessionWrapper, self).__init__(
            *args, **kw)

    def getChildWithDefault(self, path, request):
        if request.path in self.whitelist:
            return self
        return HTTPAuthSessionWrapper.getChildWithDefault(self, path, request)

    def render(self, request):
        if request.path in self.whitelist:
            _res = self._portal.realm.resource
            return _res.render(request)
        return HTTPAuthSessionWrapper.render(self, request)


def protectedResourceFactory(resource, session_tokens, whitelist):
    realm = HttpPasswordRealm(resource)
    checker = TokenDictChecker(session_tokens)
    resource_portal = portal.Portal(realm, [checker])
    credentialFactory = TokenCredentialFactory('localhost')
    protected_resource = WhitelistHTTPAuthSessionWrapper(
        resource_portal, [credentialFactory],
        whitelist=whitelist)
    return protected_resource


@implementer(checkers.ICredentialsChecker)
class TokenDictChecker:

    credentialInterfaces = (credentials.IUsernamePassword,
                            credentials.IUsernameHashedPassword)

    def __init__(self, tokens):
        self.tokens = tokens

    def requestAvatarId(self, credentials):
        username = credentials.username
        if username in self.tokens:
            if credentials.checkPassword(self.tokens[username]):
                return defer.succeed(username)
            else:
                return defer.fail(
                    credError.UnauthorizedLogin("Bad session token"))
        else:
            return defer.fail(
                credError.UnauthorizedLogin("No such user"))


@implementer(portal.IRealm)
class HttpPasswordRealm(object):

    def __init__(self, resource):
        self.resource = resource

    def requestAvatar(self, user, mind, *interfaces):
        # the resource is passed on regardless of user
        if IResource in interfaces:
            return (IResource, self.resource, lambda: None)
        raise NotImplementedError()