1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# -*- coding: utf-8 -*-
# test_auth.py
# Copyright (C) 2017 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Tests for auth pieces.
"""
import os
import collections
import pytest
from contextlib import contextmanager
from twisted.cred.credentials import UsernamePassword
from twisted.cred.error import UnauthorizedLogin
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
from twisted.web.resource import IResource
from twisted.web.test import test_httpauth
import leap.soledad.server.auth as auth_module
from leap.soledad.server.auth import SoledadRealm
from leap.soledad.server.auth import CouchDBTokenChecker
from leap.soledad.server.auth import FileTokenChecker
from leap.soledad.server.auth import TokenCredentialFactory
from leap.soledad.server._resource import PublicResource
class SoledadRealmTestCase(unittest.TestCase):
def test_returned_resource(self):
# we have to pass a pool to the realm , otherwise tests will hang
conf = {'blobs': False}
pool = reactor.getThreadPool()
realm = SoledadRealm(conf=conf, sync_pool=pool)
iface, avatar, logout = realm.requestAvatar('any', None, IResource)
self.assertIsInstance(avatar, PublicResource)
self.assertIsNone(logout())
class DummyServer(object):
"""
I fake the `couchdb.client.Server` GET api and always return the token
given on my creation.
"""
def __init__(self, token):
self._token = token
def get(self, _):
return self._token
@contextmanager
def dummy_server(token):
yield collections.defaultdict(lambda: DummyServer(token))
class CouchDBTokenCheckerTestCase(unittest.TestCase):
@inlineCallbacks
def test_good_creds(self):
# set up a dummy server which always return a *valid* token document
token = {'user_id': 'user', 'type': 'Token'}
server = dummy_server(token)
# setup the checker with the custom server
checker = CouchDBTokenChecker()
auth_module.couch_server = lambda url: server
# assert the checker *can* verify the creds
creds = UsernamePassword('user', 'pass')
avatarId = yield checker.requestAvatarId(creds)
self.assertEqual('user', avatarId)
@inlineCallbacks
def test_bad_creds(self):
# set up a dummy server which always return an *invalid* token document
token = None
server = dummy_server(token)
# setup the checker with the custom server
checker = CouchDBTokenChecker()
auth_module.couch_server = lambda url: server
# assert the checker *cannot* verify the creds
creds = UsernamePassword('user', '')
with self.assertRaises(UnauthorizedLogin):
yield checker.requestAvatarId(creds)
class FileTokenCheckerTestCase(unittest.TestCase):
@inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
def test_good_creds(self):
auth_file_path = os.path.join(self.tempdir, 'auth.file')
with open(auth_file_path, 'w') as tempfile:
tempfile.write('goodservice:goodtoken')
# setup the checker with the auth tokens file
conf = {'services_tokens_file': auth_file_path}
checker = FileTokenChecker(conf)
# assert the checker *can* verify the creds
creds = UsernamePassword('goodservice', 'goodtoken')
avatarId = yield checker.requestAvatarId(creds)
self.assertEqual('goodservice', avatarId)
@inlineCallbacks
@pytest.mark.usefixtures("method_tmpdir")
def test_bad_creds(self):
auth_file_path = os.path.join(self.tempdir, 'auth.file')
with open(auth_file_path, 'w') as tempfile:
tempfile.write('service:token')
# setup the checker with the auth tokens file
conf = {'services_tokens_file': auth_file_path}
checker = FileTokenChecker(conf)
# assert the checker *cannot* verify the creds
creds = UsernamePassword('service', 'wrongtoken')
with self.assertRaises(UnauthorizedLogin):
yield checker.requestAvatarId(creds)
class TokenCredentialFactoryTestcase(
test_httpauth.RequestMixin, test_httpauth.BasicAuthTestsMixin,
unittest.TestCase):
def setUp(self):
test_httpauth.BasicAuthTestsMixin.setUp(self)
self.credentialFactory = TokenCredentialFactory()
|