[style] more pep8 fixes
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23 from os import path
24 from datetime import datetime
25 import tempfile
26 from leap.common import ca_bundle
27 from mock import Mock, MagicMock, patch
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.trial import unittest
30
31 from leap.keymanager import (
32     KeyNotFound,
33     KeyAddressMismatch,
34     errors
35 )
36 from leap.keymanager.openpgp import OpenPGPKey
37 from leap.keymanager.keys import (
38     is_address,
39     build_key_from_dict,
40 )
41 from leap.keymanager.validation import ValidationLevels
42 from leap.keymanager.tests import (
43     KeyManagerWithSoledadTestCase,
44     ADDRESS,
45     ADDRESS_2,
46     KEY_FINGERPRINT,
47     PUBLIC_KEY,
48     PUBLIC_KEY_2,
49     PRIVATE_KEY,
50     PRIVATE_KEY_2,
51 )
52
53
54 NICKSERVER_URI = "http://leap.se/"
55 REMOTE_KEY_URL = "http://site.domain/key"
56
57
58 class KeyManagerUtilTestCase(unittest.TestCase):
59
60     def test_is_address(self):
61         self.assertTrue(
62             is_address('user@leap.se'),
63             'Incorrect address detection.')
64         self.assertFalse(
65             is_address('userleap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('user@'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('@leap.se'),
72             'Incorrect address detection.')
73
74     def test_build_key_from_dict(self):
75         kdict = {
76             'address': [ADDRESS],
77             'key_id': KEY_FINGERPRINT[-16:],
78             'fingerprint': KEY_FINGERPRINT,
79             'key_data': PUBLIC_KEY,
80             'private': False,
81             'length': 4096,
82             'expiry_date': 0,
83             'last_audited_at': 0,
84             'refreshed_at': 1311239602,
85             'validation': str(ValidationLevels.Weak_Chain),
86             'encr_used': False,
87             'sign_used': True,
88         }
89         key = build_key_from_dict(OpenPGPKey, kdict)
90         self.assertEqual(
91             kdict['address'], key.address,
92             'Wrong data in key.')
93         self.assertEqual(
94             kdict['key_id'], key.key_id,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['fingerprint'], key.fingerprint,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['key_data'], key.key_data,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['private'], key.private,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['length'], key.length,
107             'Wrong data in key.')
108         self.assertEqual(
109             None, key.expiry_date,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.last_audited_at,
113             'Wrong data in key.')
114         self.assertEqual(
115             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             ValidationLevels.get(kdict['validation']), key.validation,
119             'Wrong data in key.')
120         self.assertEqual(
121             kdict['encr_used'], key.encr_used,
122             'Wrong data in key.')
123         self.assertEqual(
124             kdict['sign_used'], key.sign_used,
125             'Wrong data in key.')
126
127
128 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
129
130     @inlineCallbacks
131     def test_get_all_keys_in_db(self):
132         km = self._key_manager()
133         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
134         # get public keys
135         keys = yield km.get_all_keys(False)
136         self.assertEqual(len(keys), 1, 'Wrong number of keys')
137         self.assertTrue(ADDRESS in keys[0].address)
138         self.assertFalse(keys[0].private)
139         # get private keys
140         keys = yield km.get_all_keys(True)
141         self.assertEqual(len(keys), 1, 'Wrong number of keys')
142         self.assertTrue(ADDRESS in keys[0].address)
143         self.assertTrue(keys[0].private)
144
145     @inlineCallbacks
146     def test_get_public_key(self):
147         km = self._key_manager()
148         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
149         # get the key
150         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
151                                fetch_remote=False)
152         self.assertTrue(key is not None)
153         self.assertTrue(ADDRESS in key.address)
154         self.assertEqual(
155             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
156         self.assertFalse(key.private)
157
158     @inlineCallbacks
159     def test_get_private_key(self):
160         km = self._key_manager()
161         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
162         # get the key
163         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
164                                fetch_remote=False)
165         self.assertTrue(key is not None)
166         self.assertTrue(ADDRESS in key.address)
167         self.assertEqual(
168             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
169         self.assertTrue(key.private)
170
171     def test_send_key_raises_key_not_found(self):
172         km = self._key_manager()
173         d = km.send_key(OpenPGPKey)
174         return self.assertFailure(d, KeyNotFound)
175
176     @inlineCallbacks
177     def test_send_key(self):
178         """
179         Test that request is well formed when sending keys to server.
180         """
181         token = "mytoken"
182         km = self._key_manager(token=token)
183         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
184         km._fetcher.put = Mock()
185         # the following data will be used on the send
186         km.ca_cert_path = 'capath'
187         km.session_id = 'sessionid'
188         km.uid = 'myuid'
189         km.api_uri = 'apiuri'
190         km.api_version = 'apiver'
191         yield km.send_key(OpenPGPKey)
192         # setup expected args
193         pubkey = yield km.get_key(km._address, OpenPGPKey)
194         data = {
195             km.PUBKEY_KEY: pubkey.key_data,
196         }
197         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
198         km._fetcher.put.assert_called_once_with(
199             url, data=data, verify='capath',
200             headers={'Authorization': 'Token token=%s' % token},
201         )
202
203     def test_fetch_keys_from_server(self):
204         """
205         Test that the request is well formed when fetching keys from server.
206         """
207         km = self._key_manager(url=NICKSERVER_URI)
208
209         def verify_the_call(_):
210             km._fetcher.get.assert_called_once_with(
211                 NICKSERVER_URI,
212                 data={'address': ADDRESS_2},
213                 verify='cacertpath',
214             )
215
216         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
217         d.addCallback(verify_the_call)
218         return d
219
220     @inlineCallbacks
221     def test_get_key_fetches_from_server(self):
222         """
223         Test that getting a key successfuly fetches from server.
224         """
225         km = self._key_manager(url=NICKSERVER_URI)
226
227         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
228         self.assertIsInstance(key, OpenPGPKey)
229         self.assertTrue(ADDRESS in key.address)
230         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
231
232     @inlineCallbacks
233     def test_get_key_fetches_other_domain(self):
234         """
235         Test that getting a key successfuly fetches from server.
236         """
237         km = self._key_manager(url=NICKSERVER_URI)
238
239         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
240         self.assertIsInstance(key, OpenPGPKey)
241         self.assertTrue(ADDRESS_OTHER in key.address)
242         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
243
244     def _fetch_key(self, km, address, key):
245         """
246         :returns: a Deferred that will fire with the OpenPGPKey
247         """
248         class Response(object):
249             status_code = 200
250             headers = {'content-type': 'application/json'}
251
252             def json(self):
253                 return {'address': address, 'openpgp': key}
254
255             def raise_for_status(self):
256                 pass
257
258         # mock the fetcher so it returns the key for ADDRESS_2
259         km._fetcher.get = Mock(return_value=Response())
260         km.ca_cert_path = 'cacertpath'
261         # try to key get without fetching from server
262         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
263         d = self.assertFailure(d_fail, KeyNotFound)
264         # try to get key fetching from server.
265         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
266         return d
267
268     @inlineCallbacks
269     def test_put_key_ascii(self):
270         """
271         Test that putting ascii key works
272         """
273         km = self._key_manager(url=NICKSERVER_URI)
274
275         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
276         key = yield km.get_key(ADDRESS, OpenPGPKey)
277         self.assertIsInstance(key, OpenPGPKey)
278         self.assertTrue(ADDRESS in key.address)
279
280     @inlineCallbacks
281     def test_fetch_uri_ascii_key(self):
282         """
283         Test that fetch key downloads the ascii key and gets included in
284         the local storage
285         """
286         km = self._key_manager()
287
288         class Response(object):
289             ok = True
290             content = PUBLIC_KEY
291
292         km._fetcher.get = Mock(return_value=Response())
293
294         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
295         key = yield km.get_key(ADDRESS, OpenPGPKey)
296         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
297
298     def test_fetch_uri_empty_key(self):
299         """
300         Test that fetch key raises KeyNotFound if no key in the url
301         """
302         km = self._key_manager()
303
304         class Response(object):
305             ok = True
306             content = ""
307
308         km._fetcher.get = Mock(return_value=Response())
309         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
310         return self.assertFailure(d, KeyNotFound)
311
312     def test_fetch_uri_address_differ(self):
313         """
314         Test that fetch key raises KeyAttributesDiffer if the address
315         don't match
316         """
317         km = self._key_manager()
318
319         class Response(object):
320             ok = True
321             content = PUBLIC_KEY
322
323         km._fetcher.get = Mock(return_value=Response())
324         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
325         return self.assertFailure(d, KeyAddressMismatch)
326
327     def _mock_get_response(self, km, body):
328         class Response(object):
329             ok = True
330             content = body
331
332         mock = MagicMock(return_value=Response())
333         km._fetcher.get = mock
334
335         return mock
336
337     @inlineCallbacks
338     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
339         ca_cert_path = None
340         km = self._key_manager(ca_cert_path=ca_cert_path)
341         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
342
343         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
344
345         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
346                                          verify=ca_bundle.where())
347
348     @inlineCallbacks
349     def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
350         ca_cert_path = ''
351         km = self._key_manager(ca_cert_path=ca_cert_path)
352         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
353
354         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
355
356         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
357                                          verify=ca_bundle.where())
358
359     @inlineCallbacks
360     def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
361         ca_cert_path = ca_bundle.where()
362         km = self._key_manager(ca_cert_path=ca_cert_path)
363         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
364
365         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
366
367         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
368                                          verify=ca_bundle.where())
369
370     @inlineCallbacks
371     def test_fetch_uses_combined_ca_bundle_otherwise(self):
372         with tempfile.NamedTemporaryFile() as tmp_input, \
373                 tempfile.NamedTemporaryFile(delete=False) as tmp_output:
374             ca_content = 'some\ncontent\n'
375             ca_cert_path = tmp_input.name
376             self._dump_to_file(ca_cert_path, ca_content)
377
378             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
379                 mock.return_value = tmp_output
380                 km = self._key_manager(ca_cert_path=ca_cert_path)
381                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
382
383                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
384
385                 # assert that combined bundle file is passed to get call
386                 get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
387                                                  verify=tmp_output.name)
388
389                 # assert that files got appended
390                 expected = self._slurp_file(ca_bundle.where()) + ca_content
391                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
392
393             del km  # force km out of scope
394             self.assertFalse(path.exists(tmp_output.name))
395
396     def _dump_to_file(self, filename, content):
397             with open(filename, 'w') as out:
398                 out.write(content)
399
400     def _slurp_file(self, filename):
401         with open(filename) as f:
402             content = f.read()
403         return content
404
405     @inlineCallbacks
406     def test_decrypt_updates_sign_used_for_signer(self):
407         # given
408         km = self._key_manager()
409         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
410         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
411             PRIVATE_KEY_2, ADDRESS_2)
412         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
413                                    sign=ADDRESS_2, fetch_remote=False)
414         yield km.decrypt(
415             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
416
417         # when
418         key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
419
420         # then
421         self.assertEqual(True, key.sign_used)
422
423     @inlineCallbacks
424     def test_decrypt_does_not_update_sign_used_for_recipient(self):
425         # given
426         km = self._key_manager()
427         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
428             PRIVATE_KEY, ADDRESS)
429         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
430             PRIVATE_KEY_2, ADDRESS_2)
431         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
432                                    sign=ADDRESS_2, fetch_remote=False)
433         yield km.decrypt(
434             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
435
436         # when
437         key = yield km.get_key(
438             ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
439
440         # then
441         self.assertEqual(False, key.sign_used)
442
443
444 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
445
446     RAW_DATA = 'data'
447
448     @inlineCallbacks
449     def test_keymanager_openpgp_encrypt_decrypt(self):
450         km = self._key_manager()
451         # put raw private key
452         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
453         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
454             PRIVATE_KEY_2, ADDRESS_2)
455         # encrypt
456         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
457                                    sign=ADDRESS_2, fetch_remote=False)
458         self.assertNotEqual(self.RAW_DATA, encdata)
459         # decrypt
460         rawdata, signingkey = yield km.decrypt(
461             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
462         self.assertEqual(self.RAW_DATA, rawdata)
463         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
464                                fetch_remote=False)
465         self.assertEqual(signingkey.fingerprint, key.fingerprint)
466
467     @inlineCallbacks
468     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
469         km = self._key_manager()
470         # put raw keys
471         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
472         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
473             PRIVATE_KEY_2, ADDRESS_2)
474         # encrypt
475         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
476                                    sign=ADDRESS_2, fetch_remote=False)
477         self.assertNotEqual(self.RAW_DATA, encdata)
478         # verify
479         rawdata, signingkey = yield km.decrypt(
480             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
481         self.assertEqual(self.RAW_DATA, rawdata)
482         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
483
484     @inlineCallbacks
485     def test_keymanager_openpgp_sign_verify(self):
486         km = self._key_manager()
487         # put raw private keys
488         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
489         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
490                                  detach=False)
491         self.assertNotEqual(self.RAW_DATA, signdata)
492         # verify
493         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
494                                      fetch_remote=False)
495         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
496                                fetch_remote=False)
497         self.assertEqual(signingkey.fingerprint, key.fingerprint)
498
499     def test_keymanager_encrypt_key_not_found(self):
500         km = self._key_manager()
501         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
502         d.addCallback(
503             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
504                                  sign=ADDRESS, fetch_remote=False))
505         return self.assertFailure(d, KeyNotFound)
506
507 if __name__ == "__main__":
508     import unittest
509     unittest.main()
510
511 # key 0F91B402: someone@somedomain.org
512 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
513 ADDRESS_OTHER = "someone@somedomain.org"
514 PUBLIC_KEY_OTHER = """
515 -----BEGIN PGP PUBLIC KEY BLOCK-----
516 Version: GnuPG v1
517
518 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
519 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
520 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
521 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
522 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
523 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
524 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
525 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
526 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
527 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
528 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
529 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
530 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
531 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
532 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
533 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
534 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
535 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
536 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
537 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
538 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
539 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
540 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
541 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
542 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
543 =gDzy
544 -----END PGP PUBLIC KEY BLOCK-----
545 """