Port validation levels to enum34
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23
24 from datetime import datetime
25 from mock import Mock
26 from twisted.internet.defer import inlineCallbacks
27 from twisted.trial import unittest
28
29 from leap.keymanager import (
30     KeyNotFound,
31     KeyAddressMismatch,
32     errors
33 )
34 from leap.keymanager.openpgp import OpenPGPKey
35 from leap.keymanager.keys import (
36     is_address,
37     build_key_from_dict,
38 )
39 from leap.keymanager.validation import (
40     ValidationLevel,
41     toValidationLevel
42 )
43 from leap.keymanager.tests import (
44     KeyManagerWithSoledadTestCase,
45     ADDRESS,
46     ADDRESS_2,
47     KEY_FINGERPRINT,
48     PUBLIC_KEY,
49     PUBLIC_KEY_2,
50     PRIVATE_KEY,
51     PRIVATE_KEY_2,
52 )
53
54
55 class KeyManagerUtilTestCase(unittest.TestCase):
56
57     def test_is_address(self):
58         self.assertTrue(
59             is_address('user@leap.se'),
60             'Incorrect address detection.')
61         self.assertFalse(
62             is_address('userleap.se'),
63             'Incorrect address detection.')
64         self.assertFalse(
65             is_address('user@'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('@leap.se'),
69             'Incorrect address detection.')
70
71     def test_build_key_from_dict(self):
72         kdict = {
73             'address': [ADDRESS],
74             'key_id': KEY_FINGERPRINT[-16:],
75             'fingerprint': KEY_FINGERPRINT,
76             'key_data': PUBLIC_KEY,
77             'private': False,
78             'length': 4096,
79             'expiry_date': 0,
80             'last_audited_at': 0,
81             'refreshed_at': 1311239602,
82             'validation': ValidationLevel.Weak_Chain.name,
83             'encr_used': False,
84             'sign_used': True,
85         }
86         key = build_key_from_dict(OpenPGPKey, kdict)
87         self.assertEqual(
88             kdict['address'], key.address,
89             'Wrong data in key.')
90         self.assertEqual(
91             kdict['key_id'], key.key_id,
92             'Wrong data in key.')
93         self.assertEqual(
94             kdict['fingerprint'], key.fingerprint,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['key_data'], key.key_data,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['private'], key.private,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['length'], key.length,
104             'Wrong data in key.')
105         self.assertEqual(
106             None, key.expiry_date,
107             'Wrong data in key.')
108         self.assertEqual(
109             None, key.last_audited_at,
110             'Wrong data in key.')
111         self.assertEqual(
112             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
113             'Wrong data in key.')
114         self.assertEqual(
115             toValidationLevel(kdict['validation']), key.validation,
116             'Wrong data in key.')
117         self.assertEqual(
118             kdict['encr_used'], key.encr_used,
119             'Wrong data in key.')
120         self.assertEqual(
121             kdict['sign_used'], key.sign_used,
122             'Wrong data in key.')
123
124
125 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
126
127     @inlineCallbacks
128     def test_get_all_keys_in_db(self):
129         km = self._key_manager()
130         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
131         # get public keys
132         keys = yield km.get_all_keys(False)
133         self.assertEqual(len(keys), 1, 'Wrong number of keys')
134         self.assertTrue(ADDRESS in keys[0].address)
135         self.assertFalse(keys[0].private)
136         # get private keys
137         keys = yield km.get_all_keys(True)
138         self.assertEqual(len(keys), 1, 'Wrong number of keys')
139         self.assertTrue(ADDRESS in keys[0].address)
140         self.assertTrue(keys[0].private)
141
142     @inlineCallbacks
143     def test_get_public_key(self):
144         km = self._key_manager()
145         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
146         # get the key
147         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
148                                fetch_remote=False)
149         self.assertTrue(key is not None)
150         self.assertTrue(ADDRESS in key.address)
151         self.assertEqual(
152             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
153         self.assertFalse(key.private)
154
155     @inlineCallbacks
156     def test_get_private_key(self):
157         km = self._key_manager()
158         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
159         # get the key
160         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
161                                fetch_remote=False)
162         self.assertTrue(key is not None)
163         self.assertTrue(ADDRESS in key.address)
164         self.assertEqual(
165             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
166         self.assertTrue(key.private)
167
168     def test_send_key_raises_key_not_found(self):
169         km = self._key_manager()
170         d = km.send_key(OpenPGPKey)
171         return self.assertFailure(d, KeyNotFound)
172
173     @inlineCallbacks
174     def test_send_key(self):
175         """
176         Test that request is well formed when sending keys to server.
177         """
178         token = "mytoken"
179         km = self._key_manager(token=token)
180         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
181         km._fetcher.put = Mock()
182         # the following data will be used on the send
183         km.ca_cert_path = 'capath'
184         km.session_id = 'sessionid'
185         km.uid = 'myuid'
186         km.api_uri = 'apiuri'
187         km.api_version = 'apiver'
188         yield km.send_key(OpenPGPKey)
189         # setup expected args
190         pubkey = yield km.get_key(km._address, OpenPGPKey)
191         data = {
192             km.PUBKEY_KEY: pubkey.key_data,
193         }
194         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
195         km._fetcher.put.assert_called_once_with(
196             url, data=data, verify='capath',
197             headers={'Authorization': 'Token token=%s' % token},
198         )
199
200     def test_fetch_keys_from_server(self):
201         """
202         Test that the request is well formed when fetching keys from server.
203         """
204         km = self._key_manager(url='http://nickserver.domain')
205
206         class Response(object):
207             status_code = 200
208             headers = {'content-type': 'application/json'}
209
210             def json(self):
211                 return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
212
213             def raise_for_status(self):
214                 pass
215
216         # mock the fetcher so it returns the key for ADDRESS_2
217         km._fetcher.get = Mock(
218             return_value=Response())
219         km.ca_cert_path = 'cacertpath'
220
221         def verify_the_call(_):
222             km._fetcher.get.assert_called_once_with(
223                 'http://nickserver.domain',
224                 data={'address': ADDRESS_2},
225                 verify='cacertpath',
226             )
227
228         d = km._fetch_keys_from_server(ADDRESS_2)
229         d.addCallback(verify_the_call)
230         return d
231
232     @inlineCallbacks
233     def test_get_key_fetches_from_server(self):
234         """
235         Test that getting a key successfuly fetches from server.
236         """
237         km = self._key_manager(url='http://nickserver.domain')
238
239         class Response(object):
240             status_code = 200
241             headers = {'content-type': 'application/json'}
242
243             def json(self):
244                 return {'address': ADDRESS, 'openpgp': PUBLIC_KEY}
245
246             def raise_for_status(self):
247                 pass
248
249         # mock the fetcher so it returns the key for ADDRESS_2
250         km._fetcher.get = Mock(return_value=Response())
251         km.ca_cert_path = 'cacertpath'
252         # try to key get without fetching from server
253         d = km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
254         yield self.assertFailure(d, KeyNotFound)
255         # try to get key fetching from server.
256         key = yield km.get_key(ADDRESS, OpenPGPKey)
257         self.assertIsInstance(key, OpenPGPKey)
258         self.assertTrue(ADDRESS in key.address)
259
260     @inlineCallbacks
261     def test_put_key_ascii(self):
262         """
263         Test that putting ascii key works
264         """
265         km = self._key_manager(url='http://nickserver.domain')
266
267         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
268         key = yield km.get_key(ADDRESS, OpenPGPKey)
269         self.assertIsInstance(key, OpenPGPKey)
270         self.assertTrue(ADDRESS in key.address)
271
272     @inlineCallbacks
273     def test_fetch_uri_ascii_key(self):
274         """
275         Test that fetch key downloads the ascii key and gets included in
276         the local storage
277         """
278         km = self._key_manager()
279
280         class Response(object):
281             ok = True
282             content = PUBLIC_KEY
283
284         km._fetcher.get = Mock(return_value=Response())
285         km.ca_cert_path = 'cacertpath'
286
287         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
288         key = yield km.get_key(ADDRESS, OpenPGPKey)
289         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
290
291     def test_fetch_uri_empty_key(self):
292         """
293         Test that fetch key raises KeyNotFound if no key in the url
294         """
295         km = self._key_manager()
296
297         class Response(object):
298             ok = True
299             content = ""
300
301         km._fetcher.get = Mock(return_value=Response())
302         km.ca_cert_path = 'cacertpath'
303         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
304         return self.assertFailure(d, KeyNotFound)
305
306     def test_fetch_uri_address_differ(self):
307         """
308         Test that fetch key raises KeyAttributesDiffer if the address
309         don't match
310         """
311         km = self._key_manager()
312
313         class Response(object):
314             ok = True
315             content = PUBLIC_KEY
316
317         km._fetcher.get = Mock(return_value=Response())
318         km.ca_cert_path = 'cacertpath'
319         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
320         return self.assertFailure(d, KeyAddressMismatch)
321
322
323 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
324
325     RAW_DATA = 'data'
326
327     @inlineCallbacks
328     def test_keymanager_openpgp_encrypt_decrypt(self):
329         km = self._key_manager()
330         # put raw private key
331         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
332         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
333             PRIVATE_KEY_2, ADDRESS_2)
334         # encrypt
335         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
336                                    sign=ADDRESS_2, fetch_remote=False)
337         self.assertNotEqual(self.RAW_DATA, encdata)
338         # decrypt
339         rawdata, signingkey = yield km.decrypt(
340             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
341         self.assertEqual(self.RAW_DATA, rawdata)
342         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
343                                fetch_remote=False)
344         self.assertEqual(signingkey.fingerprint, key.fingerprint)
345
346     @inlineCallbacks
347     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
348         km = self._key_manager()
349         # put raw keys
350         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
351         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
352             PRIVATE_KEY_2, ADDRESS_2)
353         # encrypt
354         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
355                                    sign=ADDRESS_2, fetch_remote=False)
356         self.assertNotEqual(self.RAW_DATA, encdata)
357         # verify
358         rawdata, signingkey = yield km.decrypt(
359             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
360         self.assertEqual(self.RAW_DATA, rawdata)
361         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
362
363     @inlineCallbacks
364     def test_keymanager_openpgp_sign_verify(self):
365         km = self._key_manager()
366         # put raw private keys
367         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
368         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
369                                  detach=False)
370         self.assertNotEqual(self.RAW_DATA, signdata)
371         # verify
372         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
373                                      fetch_remote=False)
374         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
375                                fetch_remote=False)
376         self.assertEqual(signingkey.fingerprint, key.fingerprint)
377
378     def test_keymanager_encrypt_key_not_found(self):
379         km = self._key_manager()
380         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
381         d.addCallback(
382             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
383                                  sign=ADDRESS, fetch_remote=False))
384         return self.assertFailure(d, KeyNotFound)
385
386
387 import unittest
388 if __name__ == "__main__":
389     unittest.main()