[feat] Make EncryptionKey aware of the active address
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23 from os import path
24 import json
25 import urllib
26 from datetime import datetime
27 import tempfile
28 import pkg_resources
29 from leap.common import ca_bundle
30 from mock import Mock, MagicMock, patch
31 from twisted.internet import defer
32 from twisted.trial import unittest
33
34 from leap.keymanager import (
35     KeyNotFound,
36     KeyAddressMismatch,
37     errors
38 )
39 from leap.keymanager.openpgp import OpenPGPKey
40 from leap.keymanager.keys import (
41     is_address,
42     build_key_from_dict,
43 )
44 from leap.keymanager.validation import ValidationLevels
45 from leap.keymanager.tests import (
46     KeyManagerWithSoledadTestCase,
47     ADDRESS,
48     ADDRESS_2,
49     KEY_FINGERPRINT,
50     PUBLIC_KEY,
51     PUBLIC_KEY_2,
52     PRIVATE_KEY,
53     PRIVATE_KEY_2,
54 )
55
56
57 NICKSERVER_URI = "http://leap.se/"
58 REMOTE_KEY_URL = "http://site.domain/key"
59
60
61 class KeyManagerUtilTestCase(unittest.TestCase):
62
63     def test_is_address(self):
64         self.assertTrue(
65             is_address('user@leap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('userleap.se'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('user@'),
72             'Incorrect address detection.')
73         self.assertFalse(
74             is_address('@leap.se'),
75             'Incorrect address detection.')
76
77     def test_build_key_from_dict(self):
78         kdict = {
79             'uids': [ADDRESS],
80             'fingerprint': KEY_FINGERPRINT,
81             'key_data': PUBLIC_KEY,
82             'private': False,
83             'length': 4096,
84             'expiry_date': 0,
85             'refreshed_at': 1311239602,
86         }
87         adict = {
88             'address': ADDRESS,
89             'private': False,
90             'last_audited_at': 0,
91             'validation': str(ValidationLevels.Weak_Chain),
92             'encr_used': False,
93             'sign_used': True,
94         }
95         key = build_key_from_dict(OpenPGPKey, kdict, adict)
96         self.assertEqual(
97             kdict['uids'], key.uids,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['fingerprint'], key.fingerprint,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['key_data'], key.key_data,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['private'], key.private,
107             'Wrong data in key.')
108         self.assertEqual(
109             kdict['length'], key.length,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.expiry_date,
113             'Wrong data in key.')
114         self.assertEqual(
115             None, key.last_audited_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
119             'Wrong data in key.')
120         self.assertEqual(
121             adict['address'], key.address,
122             'Wrong data in key.')
123         self.assertEqual(
124             ValidationLevels.get(adict['validation']), key.validation,
125             'Wrong data in key.')
126         self.assertEqual(
127             adict['encr_used'], key.encr_used,
128             'Wrong data in key.')
129         self.assertEqual(
130             adict['sign_used'], key.sign_used,
131             'Wrong data in key.')
132
133
134 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
135
136     @defer.inlineCallbacks
137     def test_get_all_keys_in_db(self):
138         km = self._key_manager()
139         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
140         # get public keys
141         keys = yield km.get_all_keys(False)
142         self.assertEqual(len(keys), 1, 'Wrong number of keys')
143         self.assertTrue(ADDRESS in keys[0].uids)
144         self.assertFalse(keys[0].private)
145         # get private keys
146         keys = yield km.get_all_keys(True)
147         self.assertEqual(len(keys), 1, 'Wrong number of keys')
148         self.assertTrue(ADDRESS in keys[0].uids)
149         self.assertTrue(keys[0].private)
150
151     @defer.inlineCallbacks
152     def test_get_public_key(self):
153         km = self._key_manager()
154         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
155         # get the key
156         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
157                                fetch_remote=False)
158         self.assertTrue(key is not None)
159         self.assertTrue(ADDRESS in key.uids)
160         self.assertEqual(
161             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
162         self.assertFalse(key.private)
163
164     @defer.inlineCallbacks
165     def test_get_private_key(self):
166         km = self._key_manager()
167         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
168         # get the key
169         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
170                                fetch_remote=False)
171         self.assertTrue(key is not None)
172         self.assertTrue(ADDRESS in key.uids)
173         self.assertEqual(
174             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
175         self.assertTrue(key.private)
176
177     def test_send_key_raises_key_not_found(self):
178         km = self._key_manager()
179         d = km.send_key(OpenPGPKey)
180         return self.assertFailure(d, KeyNotFound)
181
182     @defer.inlineCallbacks
183     def test_send_key(self):
184         """
185         Test that request is well formed when sending keys to server.
186         """
187         token = "mytoken"
188         km = self._key_manager(token=token)
189         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
190         km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
191         # the following data will be used on the send
192         km.ca_cert_path = 'capath'
193         km.session_id = 'sessionid'
194         km.uid = 'myuid'
195         km.api_uri = 'apiuri'
196         km.api_version = 'apiver'
197         yield km.send_key(OpenPGPKey)
198         # setup expected args
199         pubkey = yield km.get_key(km._address, OpenPGPKey)
200         data = urllib.urlencode({
201             km.PUBKEY_KEY: pubkey.key_data,
202         })
203         headers = {'Authorization': [str('Token token=%s' % token)]}
204         headers['Content-Type'] = ['application/x-www-form-urlencoded']
205         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
206         km._async_client_pinned.request.assert_called_once_with(
207             str(url), 'PUT', body=str(data),
208             headers=headers
209         )
210
211     def test_fetch_keys_from_server(self):
212         """
213         Test that the request is well formed when fetching keys from server.
214         """
215         km = self._key_manager(url=NICKSERVER_URI)
216         expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
217
218         def verify_the_call(_):
219             km._async_client_pinned.request.assert_called_once_with(
220                 expected_url,
221                 'GET',
222             )
223
224         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
225         d.addCallback(verify_the_call)
226         return d
227
228     @defer.inlineCallbacks
229     def test_get_key_fetches_from_server(self):
230         """
231         Test that getting a key successfuly fetches from server.
232         """
233         km = self._key_manager(url=NICKSERVER_URI)
234
235         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
236         self.assertIsInstance(key, OpenPGPKey)
237         self.assertTrue(ADDRESS in key.uids)
238         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
239
240     @defer.inlineCallbacks
241     def test_get_key_fetches_other_domain(self):
242         """
243         Test that getting a key successfuly fetches from server.
244         """
245         km = self._key_manager(url=NICKSERVER_URI)
246
247         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
248         self.assertIsInstance(key, OpenPGPKey)
249         self.assertTrue(ADDRESS_OTHER in key.uids)
250         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
251
252     def _fetch_key(self, km, address, key):
253         """
254         :returns: a Deferred that will fire with the OpenPGPKey
255         """
256         data = json.dumps({'address': address, 'openpgp': key})
257
258         # mock the fetcher so it returns the key for ADDRESS_2
259         km._async_client_pinned.request = Mock(
260             return_value=defer.succeed(data))
261         km.ca_cert_path = 'cacertpath'
262         # try to key get without fetching from server
263         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
264         d = self.assertFailure(d_fail, KeyNotFound)
265         # try to get key fetching from server.
266         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
267         return d
268
269     @defer.inlineCallbacks
270     def test_put_key_ascii(self):
271         """
272         Test that putting ascii key works
273         """
274         km = self._key_manager(url=NICKSERVER_URI)
275
276         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
277         key = yield km.get_key(ADDRESS, OpenPGPKey)
278         self.assertIsInstance(key, OpenPGPKey)
279         self.assertTrue(ADDRESS in key.uids)
280
281     @defer.inlineCallbacks
282     def test_fetch_uri_ascii_key(self):
283         """
284         Test that fetch key downloads the ascii key and gets included in
285         the local storage
286         """
287         km = self._key_manager()
288
289         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
290
291         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
292         key = yield km.get_key(ADDRESS, OpenPGPKey)
293         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
294
295     def test_fetch_uri_empty_key(self):
296         """
297         Test that fetch key raises KeyNotFound if no key in the url
298         """
299         km = self._key_manager()
300
301         class Response(object):
302             ok = True
303             content = ""
304
305         km._fetcher.get = Mock(return_value=Response())
306         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
307         return self.assertFailure(d, KeyNotFound)
308
309     def test_fetch_uri_address_differ(self):
310         """
311         Test that fetch key raises KeyAttributesDiffer if the address
312         don't match
313         """
314         km = self._key_manager()
315
316         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
317         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
318         return self.assertFailure(d, KeyAddressMismatch)
319
320     def _mock_get_response(self, km, body):
321         km._async_client.request = MagicMock(return_value=defer.succeed(body))
322
323         return km._async_client.request
324
325     @defer.inlineCallbacks
326     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
327         ca_cert_path = None
328         km = self._key_manager(ca_cert_path=ca_cert_path)
329         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
330
331         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
332
333         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
334
335     @defer.inlineCallbacks
336     def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
337         ca_cert_path = ''
338         km = self._key_manager(ca_cert_path=ca_cert_path)
339         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
340
341         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
342
343         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
344
345     @defer.inlineCallbacks
346     def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
347         ca_cert_path = ca_bundle.where()
348         km = self._key_manager(ca_cert_path=ca_cert_path)
349         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
350
351         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
352
353         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
354
355     @defer.inlineCallbacks
356     def test_fetch_uses_combined_ca_bundle_otherwise(self):
357         with tempfile.NamedTemporaryFile() as tmp_input, \
358                 tempfile.NamedTemporaryFile(delete=False) as tmp_output:
359             ca_content = pkg_resources.resource_string('leap.common.testing',
360                                                        'cacert.pem')
361             ca_cert_path = tmp_input.name
362             self._dump_to_file(ca_cert_path, ca_content)
363
364             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
365                 mock.return_value = tmp_output
366                 km = self._key_manager(ca_cert_path=ca_cert_path)
367                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
368
369                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
370
371                 # assert that combined bundle file is passed to get call
372                 get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
373
374                 # assert that files got appended
375                 expected = self._slurp_file(ca_bundle.where()) + ca_content
376                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
377
378             del km  # force km out of scope
379             self.assertFalse(path.exists(tmp_output.name))
380
381     def _dump_to_file(self, filename, content):
382             with open(filename, 'w') as out:
383                 out.write(content)
384
385     def _slurp_file(self, filename):
386         with open(filename) as f:
387             content = f.read()
388         return content
389
390     @defer.inlineCallbacks
391     def test_decrypt_updates_sign_used_for_signer(self):
392         # given
393         km = self._key_manager()
394         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
395         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
396             PRIVATE_KEY_2, ADDRESS_2)
397         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
398                                    sign=ADDRESS_2, fetch_remote=False)
399         yield km.decrypt(
400             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
401
402         # when
403         key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
404
405         # then
406         self.assertEqual(True, key.sign_used)
407
408     @defer.inlineCallbacks
409     def test_decrypt_does_not_update_sign_used_for_recipient(self):
410         # given
411         km = self._key_manager()
412         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
413             PRIVATE_KEY, ADDRESS)
414         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
415             PRIVATE_KEY_2, ADDRESS_2)
416         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
417                                    sign=ADDRESS_2, fetch_remote=False)
418         yield km.decrypt(
419             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
420
421         # when
422         key = yield km.get_key(
423             ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
424
425         # then
426         self.assertEqual(False, key.sign_used)
427
428
429 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
430
431     RAW_DATA = 'data'
432
433     @defer.inlineCallbacks
434     def test_keymanager_openpgp_encrypt_decrypt(self):
435         km = self._key_manager()
436         # put raw private key
437         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
438         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
439             PRIVATE_KEY_2, ADDRESS_2)
440         # encrypt
441         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
442                                    sign=ADDRESS_2, fetch_remote=False)
443         self.assertNotEqual(self.RAW_DATA, encdata)
444         # decrypt
445         rawdata, signingkey = yield km.decrypt(
446             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
447         self.assertEqual(self.RAW_DATA, rawdata)
448         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
449                                fetch_remote=False)
450         self.assertEqual(signingkey.fingerprint, key.fingerprint)
451
452     @defer.inlineCallbacks
453     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
454         km = self._key_manager()
455         # put raw keys
456         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
457         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
458             PRIVATE_KEY_2, ADDRESS_2)
459         # encrypt
460         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
461                                    sign=ADDRESS_2, fetch_remote=False)
462         self.assertNotEqual(self.RAW_DATA, encdata)
463         # verify
464         rawdata, signingkey = yield km.decrypt(
465             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
466         self.assertEqual(self.RAW_DATA, rawdata)
467         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
468
469     @defer.inlineCallbacks
470     def test_keymanager_openpgp_sign_verify(self):
471         km = self._key_manager()
472         # put raw private keys
473         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
474         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
475                                  detach=False)
476         self.assertNotEqual(self.RAW_DATA, signdata)
477         # verify
478         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
479                                      fetch_remote=False)
480         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
481                                fetch_remote=False)
482         self.assertEqual(signingkey.fingerprint, key.fingerprint)
483
484     def test_keymanager_encrypt_key_not_found(self):
485         km = self._key_manager()
486         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
487         d.addCallback(
488             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
489                                  sign=ADDRESS, fetch_remote=False))
490         return self.assertFailure(d, KeyNotFound)
491
492 if __name__ == "__main__":
493     import unittest
494     unittest.main()
495
496 # key 0F91B402: someone@somedomain.org
497 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
498 ADDRESS_OTHER = "someone@somedomain.org"
499 PUBLIC_KEY_OTHER = """
500 -----BEGIN PGP PUBLIC KEY BLOCK-----
501 Version: GnuPG v1
502
503 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
504 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
505 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
506 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
507 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
508 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
509 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
510 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
511 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
512 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
513 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
514 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
515 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
516 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
517 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
518 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
519 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
520 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
521 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
522 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
523 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
524 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
525 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
526 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
527 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
528 =gDzy
529 -----END PGP PUBLIC KEY BLOCK-----
530 """