summaryrefslogtreecommitdiff
path: root/service/test/unit/bitmask_libraries/test_provider.py
blob: df8512031f64ba2ba6e7179aaf34a66618f52ea6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#
# Copyright (c) 2014 ThoughtWorks, Inc.
#
# Pixelated is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pixelated is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import json

from mock import patch, MagicMock, ANY
from httmock import all_requests, HTTMock, urlmatch
from requests import HTTPError
from pixelated.bitmask_libraries.config import LeapConfig
from pixelated.bitmask_libraries.provider import LeapProvider
from pixelated.bitmask_libraries.certs import LeapCertificate
from test_abstract_leap import AbstractLeapTest
import requests


@all_requests
def not_found_mock(url, request):
    return {'status_code': 404,
            'content': 'foobar'}


@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json')
def provider_json_mock(url, request):
    return provider_json_response("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d")


@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/provider.json')
def provider_json_invalid_fingerprint_mock(url, request):
    return provider_json_response("SHA256: 0123456789012345678901234567890123456789012345678901234567890123")


def provider_json_response(fingerprint):
    content = {
        "api_uri": "https://api.some-provider.test:4430",
        "api_version": "1",
        "ca_cert_fingerprint": fingerprint,
        "ca_cert_uri": "https://some-provider.test/ca.crt",
        "domain": "some-provider.test",
        "services": [
            "mx"
        ]
    }
    return {
        "status_code": 200,
        "content": json.dumps(content)
    }


@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/soledad-service.json')
def soledad_json_mock(url, request):
    content = {
        "some key": "some value",
    }
    return {
        "status_code": 200,
        "content": json.dumps(content)
    }


@urlmatch(netloc=r'api\.some-provider\.test:4430$', path='/1/config/smtp-service.json')
def smtp_json_mock(url, request):
    content = {
        "hosts": {
            "leap-mx": {
                "hostname": "mx.some-provider.test",
                "ip_address": "0.0.0.0",
                "port": 465
            }
        },
        "locations": {},
        "serial": 1,
        "version": 1
    }
    return {
        "status_code": 200,
        "content": json.dumps(content)
    }


@urlmatch(netloc=r'(.*\.)?some-provider\.test$', path='/ca.crt')
def ca_cert_mock(url, request):
    return {
        "status_code": 200,
        "content": ca_crt
    }


ca_crt = """
-----BEGIN CERTIFICATE-----
MIIFbzCCA1egAwIBAgIBATANBgkqhkiG9w0BAQ0FADBKMREwDwYDVQQKDAhXYXpv
a2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAXBgNVBAMMEFdhem9r
YXppIFJvb3QgQ0EwHhcNMTQwMzI1MDAwMDAwWhcNMjQwMzI1MDAwMDAwWjBKMREw
DwYDVQQKDAhXYXpva2F6aTEaMBgGA1UECwwRaHR0cHM6Ly9kZmkubG9jYWwxGTAX
BgNVBAMMEFdhem9rYXppIFJvb3QgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAw
ggIKAoICAQDSPyaslC6SNVsKpGoXllInPXbjiq7rJaV08Xg+64FJU/257BZZEJ/j
r33r0xlt2kj85PcbPySLKy0omXAQt9bs273hwAQXExdY41FxMD3wP/dmLqd55KYa
LDV4GUw0QPZ0QUyWVrRHkrdCDyjpRG+6GbowmtygJKLflYmUFC1PYQ3492esr0jC
+Q6L6+/D2+hBiH3NPI22Yk0kQmuPfnu2pvo+EYQ3It81qZE0Jo8u/BqOMgN2f9DS
GvSNfZcKAP18A41/VRrYFa/WUcdDxt/uP5nO1dm2vfLorje3wcMGtGRcDKG/+GAm
S0nYKKQeWYc6z5SDvPM1VlNdn1gOejhAoggT3Hr5Dq8kxW/lQZbOz+HLbz15qGjz
gL4KHKuDE6hOuqxpHdMTY4WZBBQ8/6ICBxaXH9587/nNDdZiom+XukVD4mrSMJS7
PRr14Hw57433AJDJcZRwZNRRAGgDPNsCoR2caKB6/Uwkp+dWVndj5Ad8MEjyM1yV
+fYU6PSQWNig7qqN5VhNY+zUCcez5gL6volMuW00iOkXISW4lBrcZmEAQTTcWT1D
U7EkLlwITQce63LcuvK7ZWsEm5XCqD+yUz9oQfugmIhxAlTdqt3De9FA0WT9WxGt
zLeswCNKjnMpRgTerq6elwB03EBJVc7k1QRn4+s6C30sXR12dYnEMwIDAQABo2Aw
XjAdBgNVHQ4EFgQU8ItSdI5pSqMDjgRjgYI3Nj0SwxQwDgYDVR0PAQH/BAQDAgIE
MAwGA1UdEwQFMAMBAf8wHwYDVR0jBBgwFoAU8ItSdI5pSqMDjgRjgYI3Nj0SwxQw
DQYJKoZIhvcNAQENBQADggIBALdSPUrIqyIlSMr4R7pWd6Ep0BZH5RztVUcoXtei
x2MFi/rsw7aL9qZqACYIE8Gkkh6Z6GQph0fIqhAlNFvJXKkguL3ri5xh0XmPfbv/
OLIvaUAixATivdm8ro/IqYQWdL3P6mDZOv4O6POdBEJ9JLc9RXUt1LiQ5Xb9QiLs
l/yOthhp5dJHqC8s6CDEUHRe3s9Q/4cwNB4td47I+mkLsNtVNXqi4lOzuQamqiFt
cFIqOLTFtBJ7G3k9iaDuN6RPS6LMRbqabwg4gafQTmJ+roHpnsaiHkfomI4MZOVi
TLQKOAJ3/pRGm5cGzkzQ+z4sUiCSQxtIWs7EnQCCE8agqpef6zArAvKEO+139+f2
u1BhWOm/aHT5a3INnJEbuFr8V9MlbZSxSzU3UH7hby+9PxWKYesc6KUAu6Icooci
gEQqrVhVKmfaYMLL7UZHhw56yv/6B10SSmeAMiJhtTExjjrTRLSCaKCPa2ISAUDB
aPR3t8ZoUESWRAFQGj5NvWOomTaXfyE8Or2WfNemvdlWsKvlLeVsjts+iaTgQRU9
VXcrUhrHhaXhYXeWrWkDDcl8VUlDWXzoUGV9SczOGwr6hONJWMn1HNxNV7ywFWf0
QXH1g3LBW7qNgRaGhbIX4a1WoNQDmbbKaLgKWs74atZ8o4A2aUEjomclgZWPsc5l
VeJ6
-----END CERTIFICATE-----
"""


PROVIDER_API_CERT = '/tmp/ca.crt'
PROVIDER_WEB_CERT = '/tmp/bootstrap-ca.crt'


class LeapProviderTest(AbstractLeapTest):
    def setUp(self):
        self.config = LeapConfig(leap_home='/tmp/foobar')
        LeapCertificate.set_cert_and_fingerprint(PROVIDER_WEB_CERT, None)

    def test_provider_fetches_provider_json(self):
        with HTTMock(provider_json_mock):
            provider = LeapProvider('some-provider.test', self.config)

            self.assertEqual("1", provider.api_version)
            self.assertEqual("some-provider.test", provider.domain)
            self.assertEqual("https://api.some-provider.test:4430", provider.api_uri)
            self.assertEqual("https://some-provider.test/ca.crt", provider.ca_cert_uri)
            self.assertEqual("SHA256: 06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d",
                             provider.ca_cert_fingerprint)
            self.assertEqual(["mx"], provider.services)

    def test_provider_json_throws_exception_on_status_code(self):
        with HTTMock(not_found_mock):
            self.assertRaises(HTTPError, LeapProvider, 'some-provider.test', self.config)

    def test_fetch_soledad_json(self):
        with HTTMock(provider_json_mock, soledad_json_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)
            soledad = provider.fetch_soledad_json()

            self.assertEqual("some value", soledad.get('some key'))

    def test_throw_exception_for_fetch_soledad_status_code(self):
        with HTTMock(provider_json_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)

            self.assertRaises(HTTPError, provider.fetch_soledad_json)

    def test_fetch_smtp_json(self):
        with HTTMock(provider_json_mock, smtp_json_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)
            smtp = provider.fetch_smtp_json()
            self.assertEqual('mx.some-provider.test', smtp.get('hosts').get('leap-mx').get('hostname'))

    def test_throw_exception_for_fetch_smtp_status_code(self):
        with HTTMock(provider_json_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)
            self.assertRaises(HTTPError, provider.fetch_smtp_json)

    def test_fetch_valid_certificate(self):
        with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)
            provider.fetch_valid_certificate()

    def test_throw_exception_for_invalid_certificate(self):
        expected_exception_message = 'Certificate fingerprints don\'t match! Expected [0123456789012345678901234567890123456789012345678901234567890123] but got [06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d]'

        with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock):
            provider = LeapProvider('some-provider.test', self.config)
            with self.assertRaises(Exception) as cm:
                provider.fetch_valid_certificate()
            self.assertEqual(expected_exception_message, cm.exception.message)

    def test_that_bootstrap_cert_is_used_to_fetch_certificate(self):
        session = MagicMock(wraps=requests.session())
        session_func = MagicMock(return_value=session)
        get_func = MagicMock(wraps=requests.get)
        LeapCertificate.LEAP_CERT = PROVIDER_WEB_CERT

        with patch('pixelated.bitmask_libraries.provider.requests.session', new=session_func):
            with patch('pixelated.bitmask_libraries.provider.requests.get', new=get_func):
                with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock):
                    provider = LeapProvider('some-provider.test', self.config)
                    provider.fetch_valid_certificate()

        session.get.assert_any_call('https://some-provider.test/ca.crt', verify=PROVIDER_WEB_CERT, timeout=15)
        session.get.assert_any_call('https://some-provider.test/provider.json', verify=PROVIDER_WEB_CERT, timeout=15)

    def test_that_provider_cert_is_used_to_fetch_soledad_json(self):
        get_func = MagicMock(wraps=requests.get)
        LeapCertificate.provider_api_cert = PROVIDER_API_CERT

        with patch('pixelated.bitmask_libraries.provider.requests.get', new=get_func):
            with HTTMock(provider_json_mock, soledad_json_mock, not_found_mock):
                provider = LeapProvider('some-provider.test', self.config)
                provider.fetch_soledad_json()
        get_func.assert_called_with('https://api.some-provider.test:4430/1/config/soledad-service.json', verify=PROVIDER_API_CERT, timeout=15)

    def test_that_leap_fingerprint_is_validated(self):
        session = MagicMock(wraps=requests.session())
        session_func = MagicMock(return_value=session)
        LeapCertificate.set_cert_and_fingerprint(None, 'some fingerprint')

        with patch('pixelated.bitmask_libraries.provider.requests.session', new=session_func):
            with HTTMock(provider_json_mock, ca_cert_mock, not_found_mock):
                provider = LeapProvider('some-provider.test', self.config)
                provider.fetch_valid_certificate()

        session.get.assert_any_call('https://some-provider.test/ca.crt', verify=False, timeout=15)
        session.mount.assert_called_with('https://', ANY)