[feature] Use ca_bundle when fetching keys by url
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23
24 from datetime import datetime
25 import tempfile
26 from leap.common import ca_bundle
27 from mock import Mock, MagicMock, patch
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.trial import unittest
30
31 from leap.keymanager import (
32     KeyNotFound,
33     KeyAddressMismatch,
34     errors
35 )
36 from leap.keymanager.openpgp import OpenPGPKey
37 from leap.keymanager.keys import (
38     is_address,
39     build_key_from_dict,
40 )
41 from leap.keymanager.validation import ValidationLevels
42 from leap.keymanager.tests import (
43     KeyManagerWithSoledadTestCase,
44     ADDRESS,
45     ADDRESS_2,
46     KEY_FINGERPRINT,
47     PUBLIC_KEY,
48     PUBLIC_KEY_2,
49     PRIVATE_KEY,
50     PRIVATE_KEY_2,
51 )
52
53
54 NICKSERVER_URI = "http://leap.se/"
55 REMOTE_KEY_URL = "http://site.domain/key"
56
57
58 class KeyManagerUtilTestCase(unittest.TestCase):
59
60     def test_is_address(self):
61         self.assertTrue(
62             is_address('user@leap.se'),
63             'Incorrect address detection.')
64         self.assertFalse(
65             is_address('userleap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('user@'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('@leap.se'),
72             'Incorrect address detection.')
73
74     def test_build_key_from_dict(self):
75         kdict = {
76             'address': [ADDRESS],
77             'key_id': KEY_FINGERPRINT[-16:],
78             'fingerprint': KEY_FINGERPRINT,
79             'key_data': PUBLIC_KEY,
80             'private': False,
81             'length': 4096,
82             'expiry_date': 0,
83             'last_audited_at': 0,
84             'refreshed_at': 1311239602,
85             'validation': str(ValidationLevels.Weak_Chain),
86             'encr_used': False,
87             'sign_used': True,
88         }
89         key = build_key_from_dict(OpenPGPKey, kdict)
90         self.assertEqual(
91             kdict['address'], key.address,
92             'Wrong data in key.')
93         self.assertEqual(
94             kdict['key_id'], key.key_id,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['fingerprint'], key.fingerprint,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['key_data'], key.key_data,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['private'], key.private,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['length'], key.length,
107             'Wrong data in key.')
108         self.assertEqual(
109             None, key.expiry_date,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.last_audited_at,
113             'Wrong data in key.')
114         self.assertEqual(
115             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             ValidationLevels.get(kdict['validation']), key.validation,
119             'Wrong data in key.')
120         self.assertEqual(
121             kdict['encr_used'], key.encr_used,
122             'Wrong data in key.')
123         self.assertEqual(
124             kdict['sign_used'], key.sign_used,
125             'Wrong data in key.')
126
127
128 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
129
130     @inlineCallbacks
131     def test_get_all_keys_in_db(self):
132         km = self._key_manager()
133         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
134         # get public keys
135         keys = yield km.get_all_keys(False)
136         self.assertEqual(len(keys), 1, 'Wrong number of keys')
137         self.assertTrue(ADDRESS in keys[0].address)
138         self.assertFalse(keys[0].private)
139         # get private keys
140         keys = yield km.get_all_keys(True)
141         self.assertEqual(len(keys), 1, 'Wrong number of keys')
142         self.assertTrue(ADDRESS in keys[0].address)
143         self.assertTrue(keys[0].private)
144
145     @inlineCallbacks
146     def test_get_public_key(self):
147         km = self._key_manager()
148         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
149         # get the key
150         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
151                                fetch_remote=False)
152         self.assertTrue(key is not None)
153         self.assertTrue(ADDRESS in key.address)
154         self.assertEqual(
155             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
156         self.assertFalse(key.private)
157
158     @inlineCallbacks
159     def test_get_private_key(self):
160         km = self._key_manager()
161         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
162         # get the key
163         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
164                                fetch_remote=False)
165         self.assertTrue(key is not None)
166         self.assertTrue(ADDRESS in key.address)
167         self.assertEqual(
168             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
169         self.assertTrue(key.private)
170
171     def test_send_key_raises_key_not_found(self):
172         km = self._key_manager()
173         d = km.send_key(OpenPGPKey)
174         return self.assertFailure(d, KeyNotFound)
175
176     @inlineCallbacks
177     def test_send_key(self):
178         """
179         Test that request is well formed when sending keys to server.
180         """
181         token = "mytoken"
182         km = self._key_manager(token=token)
183         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
184         km._fetcher.put = Mock()
185         # the following data will be used on the send
186         km.ca_cert_path = 'capath'
187         km.session_id = 'sessionid'
188         km.uid = 'myuid'
189         km.api_uri = 'apiuri'
190         km.api_version = 'apiver'
191         yield km.send_key(OpenPGPKey)
192         # setup expected args
193         pubkey = yield km.get_key(km._address, OpenPGPKey)
194         data = {
195             km.PUBKEY_KEY: pubkey.key_data,
196         }
197         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
198         km._fetcher.put.assert_called_once_with(
199             url, data=data, verify='capath',
200             headers={'Authorization': 'Token token=%s' % token},
201         )
202
203     def test_fetch_keys_from_server(self):
204         """
205         Test that the request is well formed when fetching keys from server.
206         """
207         km = self._key_manager(url=NICKSERVER_URI)
208
209         def verify_the_call(_):
210             km._fetcher.get.assert_called_once_with(
211                 NICKSERVER_URI,
212                 data={'address': ADDRESS_2},
213                 verify='cacertpath',
214             )
215
216         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
217         d.addCallback(verify_the_call)
218         return d
219
220     @inlineCallbacks
221     def test_get_key_fetches_from_server(self):
222         """
223         Test that getting a key successfuly fetches from server.
224         """
225         km = self._key_manager(url=NICKSERVER_URI)
226
227         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
228         self.assertIsInstance(key, OpenPGPKey)
229         self.assertTrue(ADDRESS in key.address)
230         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
231
232     @inlineCallbacks
233     def test_get_key_fetches_other_domain(self):
234         """
235         Test that getting a key successfuly fetches from server.
236         """
237         km = self._key_manager(url=NICKSERVER_URI)
238
239         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
240         self.assertIsInstance(key, OpenPGPKey)
241         self.assertTrue(ADDRESS_OTHER in key.address)
242         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
243
244     def _fetch_key(self, km, address, key):
245         """
246         :returns: a Deferred that will fire with the OpenPGPKey
247         """
248         class Response(object):
249             status_code = 200
250             headers = {'content-type': 'application/json'}
251
252             def json(self):
253                 return {'address': address, 'openpgp': key}
254
255             def raise_for_status(self):
256                 pass
257
258         # mock the fetcher so it returns the key for ADDRESS_2
259         km._fetcher.get = Mock(return_value=Response())
260         km.ca_cert_path = 'cacertpath'
261         # try to key get without fetching from server
262         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
263         d = self.assertFailure(d_fail, KeyNotFound)
264         # try to get key fetching from server.
265         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
266         return d
267
268     @inlineCallbacks
269     def test_put_key_ascii(self):
270         """
271         Test that putting ascii key works
272         """
273         km = self._key_manager(url=NICKSERVER_URI)
274
275         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
276         key = yield km.get_key(ADDRESS, OpenPGPKey)
277         self.assertIsInstance(key, OpenPGPKey)
278         self.assertTrue(ADDRESS in key.address)
279
280     @inlineCallbacks
281     def test_fetch_uri_ascii_key(self):
282         """
283         Test that fetch key downloads the ascii key and gets included in
284         the local storage
285         """
286         km = self._key_manager()
287
288         class Response(object):
289             ok = True
290             content = PUBLIC_KEY
291
292         km._fetcher.get = Mock(return_value=Response())
293
294         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
295         key = yield km.get_key(ADDRESS, OpenPGPKey)
296         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
297
298     def test_fetch_uri_empty_key(self):
299         """
300         Test that fetch key raises KeyNotFound if no key in the url
301         """
302         km = self._key_manager()
303
304         class Response(object):
305             ok = True
306             content = ""
307
308         km._fetcher.get = Mock(return_value=Response())
309         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
310         return self.assertFailure(d, KeyNotFound)
311
312     def test_fetch_uri_address_differ(self):
313         """
314         Test that fetch key raises KeyAttributesDiffer if the address
315         don't match
316         """
317         km = self._key_manager()
318
319         class Response(object):
320             ok = True
321             content = PUBLIC_KEY
322
323         km._fetcher.get = Mock(return_value=Response())
324         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
325         return self.assertFailure(d, KeyAddressMismatch)
326
327     def _mock_get_response(self, km, body):
328         class Response(object):
329             ok = True
330             content = body
331
332         mock = MagicMock(return_value=Response())
333         km._fetcher.get = mock
334
335         return mock
336
337     @inlineCallbacks
338     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
339         ca_cert_path = None
340         km = self._key_manager(ca_cert_path=ca_cert_path)
341         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
342
343         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
344
345         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
346
347     @inlineCallbacks
348     def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert_path(self):
349         ca_cert_path = ca_bundle.where()
350         km = self._key_manager(ca_cert_path=ca_cert_path)
351         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
352
353         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
354
355         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=ca_bundle.where())
356
357     @inlineCallbacks
358     def test_fetch_uses_combined_ca_bundle_otherwise(self):
359         with tempfile.NamedTemporaryFile() as tmp_input, tempfile.NamedTemporaryFile() as tmp_output:
360             ca_content = 'some\ncontent\n'
361             ca_cert_path = tmp_input.name
362             self._dump_to_file(ca_cert_path, ca_content)
363
364             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
365                 mock.return_value = tmp_output
366                 km = self._key_manager(ca_cert_path=ca_cert_path)
367                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
368
369                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
370
371                 # assert that combined bundle file is passed to get call
372                 get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None, verify=tmp_output.name)
373
374                 # assert that files got appended
375                 expected = self._slurp_file(ca_bundle.where()) + ca_content
376                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
377
378     def _dump_to_file(self, filename, content):
379             with open(filename, 'w') as out:
380                 out.write(content)
381
382     def _slurp_file(self, filename):
383         with open(filename) as f:
384             content = f.read()
385         return content
386
387
388 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
389
390     RAW_DATA = 'data'
391
392     @inlineCallbacks
393     def test_keymanager_openpgp_encrypt_decrypt(self):
394         km = self._key_manager()
395         # put raw private key
396         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
397         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
398             PRIVATE_KEY_2, ADDRESS_2)
399         # encrypt
400         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
401                                    sign=ADDRESS_2, fetch_remote=False)
402         self.assertNotEqual(self.RAW_DATA, encdata)
403         # decrypt
404         rawdata, signingkey = yield km.decrypt(
405             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
406         self.assertEqual(self.RAW_DATA, rawdata)
407         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
408                                fetch_remote=False)
409         self.assertEqual(signingkey.fingerprint, key.fingerprint)
410
411     @inlineCallbacks
412     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
413         km = self._key_manager()
414         # put raw keys
415         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
416         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
417             PRIVATE_KEY_2, ADDRESS_2)
418         # encrypt
419         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
420                                    sign=ADDRESS_2, fetch_remote=False)
421         self.assertNotEqual(self.RAW_DATA, encdata)
422         # verify
423         rawdata, signingkey = yield km.decrypt(
424             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
425         self.assertEqual(self.RAW_DATA, rawdata)
426         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
427
428     @inlineCallbacks
429     def test_keymanager_openpgp_sign_verify(self):
430         km = self._key_manager()
431         # put raw private keys
432         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
433         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
434                                  detach=False)
435         self.assertNotEqual(self.RAW_DATA, signdata)
436         # verify
437         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
438                                      fetch_remote=False)
439         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
440                                fetch_remote=False)
441         self.assertEqual(signingkey.fingerprint, key.fingerprint)
442
443     def test_keymanager_encrypt_key_not_found(self):
444         km = self._key_manager()
445         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
446         d.addCallback(
447             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
448                                  sign=ADDRESS, fetch_remote=False))
449         return self.assertFailure(d, KeyNotFound)
450
451 if __name__ == "__main__":
452     import unittest
453     unittest.main()
454
455 # key 0F91B402: someone@somedomain.org
456 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
457 ADDRESS_OTHER = "someone@somedomain.org"
458 PUBLIC_KEY_OTHER = """
459 -----BEGIN PGP PUBLIC KEY BLOCK-----
460 Version: GnuPG v1
461
462 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
463 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
464 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
465 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
466 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
467 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
468 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
469 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
470 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
471 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
472 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
473 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
474 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
475 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
476 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
477 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
478 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
479 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
480 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
481 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
482 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
483 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
484 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
485 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
486 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
487 =gDzy
488 -----END PGP PUBLIC KEY BLOCK-----
489 """