c3426b6984057a1dbd86ecdebe5a635b028b556a
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23 from os import path
24 import json
25 import urllib
26 from datetime import datetime
27 import tempfile
28 import pkg_resources
29 from leap.common import ca_bundle
30 from mock import Mock, MagicMock, patch
31 from twisted.internet import defer
32 from twisted.trial import unittest
33
34 from leap.keymanager import (
35     KeyNotFound,
36     KeyAddressMismatch,
37     errors
38 )
39 from leap.keymanager.openpgp import OpenPGPKey
40 from leap.keymanager.keys import (
41     is_address,
42     build_key_from_dict,
43 )
44 from leap.keymanager.validation import ValidationLevels
45 from leap.keymanager.tests import (
46     KeyManagerWithSoledadTestCase,
47     ADDRESS,
48     ADDRESS_2,
49     KEY_FINGERPRINT,
50     PUBLIC_KEY,
51     PUBLIC_KEY_2,
52     PRIVATE_KEY,
53     PRIVATE_KEY_2,
54 )
55
56
57 NICKSERVER_URI = "http://leap.se/"
58 REMOTE_KEY_URL = "http://site.domain/key"
59
60
61 class KeyManagerUtilTestCase(unittest.TestCase):
62
63     def test_is_address(self):
64         self.assertTrue(
65             is_address('user@leap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('userleap.se'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('user@'),
72             'Incorrect address detection.')
73         self.assertFalse(
74             is_address('@leap.se'),
75             'Incorrect address detection.')
76
77     def test_build_key_from_dict(self):
78         kdict = {
79             'address': [ADDRESS],
80             'key_id': KEY_FINGERPRINT[-16:],
81             'fingerprint': KEY_FINGERPRINT,
82             'key_data': PUBLIC_KEY,
83             'private': False,
84             'length': 4096,
85             'expiry_date': 0,
86             'last_audited_at': 0,
87             'refreshed_at': 1311239602,
88             'validation': str(ValidationLevels.Weak_Chain),
89             'encr_used': False,
90             'sign_used': True,
91         }
92         key = build_key_from_dict(OpenPGPKey, kdict)
93         self.assertEqual(
94             kdict['address'], key.address,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['key_id'], key.key_id,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['fingerprint'], key.fingerprint,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['key_data'], key.key_data,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['private'], key.private,
107             'Wrong data in key.')
108         self.assertEqual(
109             kdict['length'], key.length,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.expiry_date,
113             'Wrong data in key.')
114         self.assertEqual(
115             None, key.last_audited_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
119             'Wrong data in key.')
120         self.assertEqual(
121             ValidationLevels.get(kdict['validation']), key.validation,
122             'Wrong data in key.')
123         self.assertEqual(
124             kdict['encr_used'], key.encr_used,
125             'Wrong data in key.')
126         self.assertEqual(
127             kdict['sign_used'], key.sign_used,
128             'Wrong data in key.')
129
130
131 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
132
133     @defer.inlineCallbacks
134     def test_get_all_keys_in_db(self):
135         km = self._key_manager()
136         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
137         # get public keys
138         keys = yield km.get_all_keys(False)
139         self.assertEqual(len(keys), 1, 'Wrong number of keys')
140         self.assertTrue(ADDRESS in keys[0].address)
141         self.assertFalse(keys[0].private)
142         # get private keys
143         keys = yield km.get_all_keys(True)
144         self.assertEqual(len(keys), 1, 'Wrong number of keys')
145         self.assertTrue(ADDRESS in keys[0].address)
146         self.assertTrue(keys[0].private)
147
148     @defer.inlineCallbacks
149     def test_get_public_key(self):
150         km = self._key_manager()
151         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
152         # get the key
153         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
154                                fetch_remote=False)
155         self.assertTrue(key is not None)
156         self.assertTrue(ADDRESS in key.address)
157         self.assertEqual(
158             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
159         self.assertFalse(key.private)
160
161     @defer.inlineCallbacks
162     def test_get_private_key(self):
163         km = self._key_manager()
164         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
165         # get the key
166         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
167                                fetch_remote=False)
168         self.assertTrue(key is not None)
169         self.assertTrue(ADDRESS in key.address)
170         self.assertEqual(
171             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
172         self.assertTrue(key.private)
173
174     def test_send_key_raises_key_not_found(self):
175         km = self._key_manager()
176         d = km.send_key(OpenPGPKey)
177         return self.assertFailure(d, KeyNotFound)
178
179     @defer.inlineCallbacks
180     def test_send_key(self):
181         """
182         Test that request is well formed when sending keys to server.
183         """
184         token = "mytoken"
185         km = self._key_manager(token=token)
186         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
187         km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
188         # the following data will be used on the send
189         km.ca_cert_path = 'capath'
190         km.session_id = 'sessionid'
191         km.uid = 'myuid'
192         km.api_uri = 'apiuri'
193         km.api_version = 'apiver'
194         yield km.send_key(OpenPGPKey)
195         # setup expected args
196         pubkey = yield km.get_key(km._address, OpenPGPKey)
197         data = urllib.urlencode({
198             km.PUBKEY_KEY: pubkey.key_data,
199         })
200         headers = {'Authorization': [str('Token token=%s' % token)]}
201         headers['Content-Type'] = ['application/x-www-form-urlencoded']
202         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
203         km._async_client_pinned.request.assert_called_once_with(
204             str(url), 'PUT', body=str(data),
205             headers=headers
206         )
207
208     def test_fetch_keys_from_server(self):
209         """
210         Test that the request is well formed when fetching keys from server.
211         """
212         km = self._key_manager(url=NICKSERVER_URI)
213         expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
214
215         def verify_the_call(_):
216             km._async_client_pinned.request.assert_called_once_with(
217                 expected_url,
218                 'GET',
219             )
220
221         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
222         d.addCallback(verify_the_call)
223         return d
224
225     @defer.inlineCallbacks
226     def test_get_key_fetches_from_server(self):
227         """
228         Test that getting a key successfuly fetches from server.
229         """
230         km = self._key_manager(url=NICKSERVER_URI)
231
232         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
233         self.assertIsInstance(key, OpenPGPKey)
234         self.assertTrue(ADDRESS in key.address)
235         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
236
237     @defer.inlineCallbacks
238     def test_get_key_fetches_other_domain(self):
239         """
240         Test that getting a key successfuly fetches from server.
241         """
242         km = self._key_manager(url=NICKSERVER_URI)
243
244         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
245         self.assertIsInstance(key, OpenPGPKey)
246         self.assertTrue(ADDRESS_OTHER in key.address)
247         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
248
249     def _fetch_key(self, km, address, key):
250         """
251         :returns: a Deferred that will fire with the OpenPGPKey
252         """
253         data = json.dumps({'address': address, 'openpgp': key})
254
255         # mock the fetcher so it returns the key for ADDRESS_2
256         km._async_client_pinned.request = Mock(return_value=defer.succeed(data))
257         km.ca_cert_path = 'cacertpath'
258         # try to key get without fetching from server
259         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
260         d = self.assertFailure(d_fail, KeyNotFound)
261         # try to get key fetching from server.
262         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
263         return d
264
265     @defer.inlineCallbacks
266     def test_put_key_ascii(self):
267         """
268         Test that putting ascii key works
269         """
270         km = self._key_manager(url=NICKSERVER_URI)
271
272         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
273         key = yield km.get_key(ADDRESS, OpenPGPKey)
274         self.assertIsInstance(key, OpenPGPKey)
275         self.assertTrue(ADDRESS in key.address)
276
277     @defer.inlineCallbacks
278     def test_fetch_uri_ascii_key(self):
279         """
280         Test that fetch key downloads the ascii key and gets included in
281         the local storage
282         """
283         km = self._key_manager()
284
285         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
286
287         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
288         key = yield km.get_key(ADDRESS, OpenPGPKey)
289         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
290
291     def test_fetch_uri_empty_key(self):
292         """
293         Test that fetch key raises KeyNotFound if no key in the url
294         """
295         km = self._key_manager()
296
297         class Response(object):
298             ok = True
299             content = ""
300
301         km._fetcher.get = Mock(return_value=Response())
302         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
303         return self.assertFailure(d, KeyNotFound)
304
305     def test_fetch_uri_address_differ(self):
306         """
307         Test that fetch key raises KeyAttributesDiffer if the address
308         don't match
309         """
310         km = self._key_manager()
311
312         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
313         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
314         return self.assertFailure(d, KeyAddressMismatch)
315
316     def _mock_get_response(self, km, body):
317         km._async_client.request = MagicMock(return_value=defer.succeed(body))
318
319         return km._async_client.request
320
321     @defer.inlineCallbacks
322     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
323         ca_cert_path = None
324         km = self._key_manager(ca_cert_path=ca_cert_path)
325         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
326
327         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
328
329         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
330
331     @defer.inlineCallbacks
332     def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
333         ca_cert_path = ''
334         km = self._key_manager(ca_cert_path=ca_cert_path)
335         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
336
337         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
338
339         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
340
341     @defer.inlineCallbacks
342     def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
343         ca_cert_path = ca_bundle.where()
344         km = self._key_manager(ca_cert_path=ca_cert_path)
345         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
346
347         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
348
349         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
350
351     @defer.inlineCallbacks
352     def test_fetch_uses_combined_ca_bundle_otherwise(self):
353         with tempfile.NamedTemporaryFile() as tmp_input, \
354                 tempfile.NamedTemporaryFile(delete=False) as tmp_output:
355             ca_content = pkg_resources.resource_string('leap.common.testing',
356                                                        'cacert.pem')
357             ca_cert_path = tmp_input.name
358             self._dump_to_file(ca_cert_path, ca_content)
359
360             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
361                 mock.return_value = tmp_output
362                 km = self._key_manager(ca_cert_path=ca_cert_path)
363                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
364
365                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
366
367                 # assert that combined bundle file is passed to get call
368                 get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
369
370                 # assert that files got appended
371                 expected = self._slurp_file(ca_bundle.where()) + ca_content
372                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
373
374             del km  # force km out of scope
375             self.assertFalse(path.exists(tmp_output.name))
376
377     def _dump_to_file(self, filename, content):
378             with open(filename, 'w') as out:
379                 out.write(content)
380
381     def _slurp_file(self, filename):
382         with open(filename) as f:
383             content = f.read()
384         return content
385
386     @defer.inlineCallbacks
387     def test_decrypt_updates_sign_used_for_signer(self):
388         # given
389         km = self._key_manager()
390         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
391         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
392             PRIVATE_KEY_2, ADDRESS_2)
393         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
394                                    sign=ADDRESS_2, fetch_remote=False)
395         yield km.decrypt(
396             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
397
398         # when
399         key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
400
401         # then
402         self.assertEqual(True, key.sign_used)
403
404     @defer.inlineCallbacks
405     def test_decrypt_does_not_update_sign_used_for_recipient(self):
406         # given
407         km = self._key_manager()
408         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
409             PRIVATE_KEY, ADDRESS)
410         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
411             PRIVATE_KEY_2, ADDRESS_2)
412         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
413                                    sign=ADDRESS_2, fetch_remote=False)
414         yield km.decrypt(
415             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
416
417         # when
418         key = yield km.get_key(
419             ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
420
421         # then
422         self.assertEqual(False, key.sign_used)
423
424
425 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
426
427     RAW_DATA = 'data'
428
429     @defer.inlineCallbacks
430     def test_keymanager_openpgp_encrypt_decrypt(self):
431         km = self._key_manager()
432         # put raw private key
433         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
434         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
435             PRIVATE_KEY_2, ADDRESS_2)
436         # encrypt
437         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
438                                    sign=ADDRESS_2, fetch_remote=False)
439         self.assertNotEqual(self.RAW_DATA, encdata)
440         # decrypt
441         rawdata, signingkey = yield km.decrypt(
442             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
443         self.assertEqual(self.RAW_DATA, rawdata)
444         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
445                                fetch_remote=False)
446         self.assertEqual(signingkey.fingerprint, key.fingerprint)
447
448     @defer.inlineCallbacks
449     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
450         km = self._key_manager()
451         # put raw keys
452         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
453         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
454             PRIVATE_KEY_2, ADDRESS_2)
455         # encrypt
456         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
457                                    sign=ADDRESS_2, fetch_remote=False)
458         self.assertNotEqual(self.RAW_DATA, encdata)
459         # verify
460         rawdata, signingkey = yield km.decrypt(
461             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
462         self.assertEqual(self.RAW_DATA, rawdata)
463         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
464
465     @defer.inlineCallbacks
466     def test_keymanager_openpgp_sign_verify(self):
467         km = self._key_manager()
468         # put raw private keys
469         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
470         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
471                                  detach=False)
472         self.assertNotEqual(self.RAW_DATA, signdata)
473         # verify
474         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
475                                      fetch_remote=False)
476         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
477                                fetch_remote=False)
478         self.assertEqual(signingkey.fingerprint, key.fingerprint)
479
480     def test_keymanager_encrypt_key_not_found(self):
481         km = self._key_manager()
482         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
483         d.addCallback(
484             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
485                                  sign=ADDRESS, fetch_remote=False))
486         return self.assertFailure(d, KeyNotFound)
487
488 if __name__ == "__main__":
489     import unittest
490     unittest.main()
491
492 # key 0F91B402: someone@somedomain.org
493 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
494 ADDRESS_OTHER = "someone@somedomain.org"
495 PUBLIC_KEY_OTHER = """
496 -----BEGIN PGP PUBLIC KEY BLOCK-----
497 Version: GnuPG v1
498
499 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
500 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
501 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
502 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
503 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
504 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
505 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
506 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
507 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
508 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
509 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
510 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
511 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
512 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
513 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
514 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
515 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
516 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
517 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
518 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
519 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
520 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
521 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
522 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
523 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
524 =gDzy
525 -----END PGP PUBLIC KEY BLOCK-----
526 """