[test] add updater tests
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23 from os import path
24 import json
25 import urllib
26 from datetime import datetime
27 import tempfile
28 import pkg_resources
29 from leap.common import ca_bundle
30 from mock import Mock, MagicMock, patch
31 from twisted.internet import defer
32 from twisted.trial import unittest
33
34 from leap.keymanager import (
35     KeyNotFound,
36     KeyAddressMismatch,
37     errors
38 )
39 from leap.keymanager.openpgp import OpenPGPKey
40 from leap.keymanager.keys import (
41     is_address,
42     build_key_from_dict,
43 )
44 from leap.keymanager.validation import ValidationLevels
45 from leap.keymanager.tests import (
46     KeyManagerWithSoledadTestCase,
47     ADDRESS,
48     ADDRESS_2,
49     KEY_FINGERPRINT,
50     PUBLIC_KEY,
51     PUBLIC_KEY_2,
52     PRIVATE_KEY,
53     PRIVATE_KEY_2,
54 )
55
56
57 NICKSERVER_URI = "http://leap.se/"
58 REMOTE_KEY_URL = "http://site.domain/key"
59
60
61 class KeyManagerUtilTestCase(unittest.TestCase):
62
63     def test_is_address(self):
64         self.assertTrue(
65             is_address('user@leap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('userleap.se'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('user@'),
72             'Incorrect address detection.')
73         self.assertFalse(
74             is_address('@leap.se'),
75             'Incorrect address detection.')
76
77     def test_build_key_from_dict(self):
78         kdict = {
79             'address': [ADDRESS],
80             'fingerprint': KEY_FINGERPRINT,
81             'key_data': PUBLIC_KEY,
82             'private': False,
83             'length': 4096,
84             'expiry_date': 0,
85             'refreshed_at': 1311239602,
86         }
87         adict = {
88             'address': ADDRESS,
89             'private': False,
90             'last_audited_at': 0,
91             'validation': str(ValidationLevels.Weak_Chain),
92             'encr_used': False,
93             'sign_used': True,
94         }
95         key = build_key_from_dict(OpenPGPKey, kdict, adict)
96         self.assertEqual(
97             kdict['address'], key.address,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['fingerprint'], key.fingerprint,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['key_data'], key.key_data,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['private'], key.private,
107             'Wrong data in key.')
108         self.assertEqual(
109             kdict['length'], key.length,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.expiry_date,
113             'Wrong data in key.')
114         self.assertEqual(
115             None, key.last_audited_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
119             'Wrong data in key.')
120         self.assertEqual(
121             ValidationLevels.get(adict['validation']), key.validation,
122             'Wrong data in key.')
123         self.assertEqual(
124             adict['encr_used'], key.encr_used,
125             'Wrong data in key.')
126         self.assertEqual(
127             adict['sign_used'], key.sign_used,
128             'Wrong data in key.')
129
130
131 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
132
133     @defer.inlineCallbacks
134     def test_get_all_keys_in_db(self):
135         km = self._key_manager()
136         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
137         # get public keys
138         keys = yield km.get_all_keys(False)
139         self.assertEqual(len(keys), 1, 'Wrong number of keys')
140         self.assertTrue(ADDRESS in keys[0].address)
141         self.assertFalse(keys[0].private)
142         # get private keys
143         keys = yield km.get_all_keys(True)
144         self.assertEqual(len(keys), 1, 'Wrong number of keys')
145         self.assertTrue(ADDRESS in keys[0].address)
146         self.assertTrue(keys[0].private)
147
148     @defer.inlineCallbacks
149     def test_get_public_key(self):
150         km = self._key_manager()
151         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
152         # get the key
153         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
154                                fetch_remote=False)
155         self.assertTrue(key is not None)
156         self.assertTrue(ADDRESS in key.address)
157         self.assertEqual(
158             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
159         self.assertFalse(key.private)
160
161     @defer.inlineCallbacks
162     def test_get_private_key(self):
163         km = self._key_manager()
164         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
165         # get the key
166         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
167                                fetch_remote=False)
168         self.assertTrue(key is not None)
169         self.assertTrue(ADDRESS in key.address)
170         self.assertEqual(
171             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
172         self.assertTrue(key.private)
173
174     def test_send_key_raises_key_not_found(self):
175         km = self._key_manager()
176         d = km.send_key(OpenPGPKey)
177         return self.assertFailure(d, KeyNotFound)
178
179     @defer.inlineCallbacks
180     def test_send_key(self):
181         """
182         Test that request is well formed when sending keys to server.
183         """
184         token = "mytoken"
185         km = self._key_manager(token=token)
186         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
187         km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
188         # the following data will be used on the send
189         km.ca_cert_path = 'capath'
190         km.session_id = 'sessionid'
191         km.uid = 'myuid'
192         km.api_uri = 'apiuri'
193         km.api_version = 'apiver'
194         yield km.send_key(OpenPGPKey)
195         # setup expected args
196         pubkey = yield km.get_key(km._address, OpenPGPKey)
197         data = urllib.urlencode({
198             km.PUBKEY_KEY: pubkey.key_data,
199         })
200         headers = {'Authorization': [str('Token token=%s' % token)]}
201         headers['Content-Type'] = ['application/x-www-form-urlencoded']
202         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
203         km._async_client_pinned.request.assert_called_once_with(
204             str(url), 'PUT', body=str(data),
205             headers=headers
206         )
207
208     def test_fetch_keys_from_server(self):
209         """
210         Test that the request is well formed when fetching keys from server.
211         """
212         km = self._key_manager(url=NICKSERVER_URI)
213         expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
214
215         def verify_the_call(_):
216             km._async_client_pinned.request.assert_called_once_with(
217                 expected_url,
218                 'GET',
219             )
220
221         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
222         d.addCallback(verify_the_call)
223         return d
224
225     @defer.inlineCallbacks
226     def test_get_key_fetches_from_server(self):
227         """
228         Test that getting a key successfuly fetches from server.
229         """
230         km = self._key_manager(url=NICKSERVER_URI)
231
232         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
233         self.assertIsInstance(key, OpenPGPKey)
234         self.assertTrue(ADDRESS in key.address)
235         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
236
237     @defer.inlineCallbacks
238     def test_get_key_fetches_other_domain(self):
239         """
240         Test that getting a key successfuly fetches from server.
241         """
242         km = self._key_manager(url=NICKSERVER_URI)
243
244         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
245         self.assertIsInstance(key, OpenPGPKey)
246         self.assertTrue(ADDRESS_OTHER in key.address)
247         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
248
249     def _fetch_key(self, km, address, key):
250         """
251         :returns: a Deferred that will fire with the OpenPGPKey
252         """
253         data = json.dumps({'address': address, 'openpgp': key})
254
255         # mock the fetcher so it returns the key for ADDRESS_2
256         km._async_client_pinned.request = Mock(
257             return_value=defer.succeed(data))
258         km.ca_cert_path = 'cacertpath'
259         # try to key get without fetching from server
260         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
261         d = self.assertFailure(d_fail, KeyNotFound)
262         # try to get key fetching from server.
263         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
264         return d
265
266     @defer.inlineCallbacks
267     def test_put_key_ascii(self):
268         """
269         Test that putting ascii key works
270         """
271         km = self._key_manager(url=NICKSERVER_URI)
272
273         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
274         key = yield km.get_key(ADDRESS, OpenPGPKey)
275         self.assertIsInstance(key, OpenPGPKey)
276         self.assertTrue(ADDRESS in key.address)
277
278     @defer.inlineCallbacks
279     def test_fetch_uri_ascii_key(self):
280         """
281         Test that fetch key downloads the ascii key and gets included in
282         the local storage
283         """
284         km = self._key_manager()
285
286         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
287
288         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
289         key = yield km.get_key(ADDRESS, OpenPGPKey)
290         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
291
292     def test_fetch_uri_empty_key(self):
293         """
294         Test that fetch key raises KeyNotFound if no key in the url
295         """
296         km = self._key_manager()
297
298         class Response(object):
299             ok = True
300             content = ""
301
302         km._fetcher.get = Mock(return_value=Response())
303         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
304         return self.assertFailure(d, KeyNotFound)
305
306     def test_fetch_uri_address_differ(self):
307         """
308         Test that fetch key raises KeyAttributesDiffer if the address
309         don't match
310         """
311         km = self._key_manager()
312
313         km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
314         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
315         return self.assertFailure(d, KeyAddressMismatch)
316
317     def _mock_get_response(self, km, body):
318         km._async_client.request = MagicMock(return_value=defer.succeed(body))
319
320         return km._async_client.request
321
322     @defer.inlineCallbacks
323     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
324         ca_cert_path = None
325         km = self._key_manager(ca_cert_path=ca_cert_path)
326         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
327
328         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
329
330         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
331
332     @defer.inlineCallbacks
333     def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
334         ca_cert_path = ''
335         km = self._key_manager(ca_cert_path=ca_cert_path)
336         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
337
338         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
339
340         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
341
342     @defer.inlineCallbacks
343     def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
344         ca_cert_path = ca_bundle.where()
345         km = self._key_manager(ca_cert_path=ca_cert_path)
346         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
347
348         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
349
350         get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
351
352     @defer.inlineCallbacks
353     def test_fetch_uses_combined_ca_bundle_otherwise(self):
354         with tempfile.NamedTemporaryFile() as tmp_input, \
355                 tempfile.NamedTemporaryFile(delete=False) as tmp_output:
356             ca_content = pkg_resources.resource_string('leap.common.testing',
357                                                        'cacert.pem')
358             ca_cert_path = tmp_input.name
359             self._dump_to_file(ca_cert_path, ca_content)
360
361             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
362                 mock.return_value = tmp_output
363                 km = self._key_manager(ca_cert_path=ca_cert_path)
364                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
365
366                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
367
368                 # assert that combined bundle file is passed to get call
369                 get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
370
371                 # assert that files got appended
372                 expected = self._slurp_file(ca_bundle.where()) + ca_content
373                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
374
375             del km  # force km out of scope
376             self.assertFalse(path.exists(tmp_output.name))
377
378     def _dump_to_file(self, filename, content):
379             with open(filename, 'w') as out:
380                 out.write(content)
381
382     def _slurp_file(self, filename):
383         with open(filename) as f:
384             content = f.read()
385         return content
386
387     @defer.inlineCallbacks
388     def test_decrypt_updates_sign_used_for_signer(self):
389         # given
390         km = self._key_manager()
391         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
392         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
393             PRIVATE_KEY_2, ADDRESS_2)
394         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
395                                    sign=ADDRESS_2, fetch_remote=False)
396         yield km.decrypt(
397             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
398
399         # when
400         key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
401
402         # then
403         self.assertEqual(True, key.sign_used)
404
405     @defer.inlineCallbacks
406     def test_decrypt_does_not_update_sign_used_for_recipient(self):
407         # given
408         km = self._key_manager()
409         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
410             PRIVATE_KEY, ADDRESS)
411         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
412             PRIVATE_KEY_2, ADDRESS_2)
413         encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
414                                    sign=ADDRESS_2, fetch_remote=False)
415         yield km.decrypt(
416             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
417
418         # when
419         key = yield km.get_key(
420             ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
421
422         # then
423         self.assertEqual(False, key.sign_used)
424
425
426 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
427
428     RAW_DATA = 'data'
429
430     @defer.inlineCallbacks
431     def test_keymanager_openpgp_encrypt_decrypt(self):
432         km = self._key_manager()
433         # put raw private key
434         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
435         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
436             PRIVATE_KEY_2, ADDRESS_2)
437         # encrypt
438         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
439                                    sign=ADDRESS_2, fetch_remote=False)
440         self.assertNotEqual(self.RAW_DATA, encdata)
441         # decrypt
442         rawdata, signingkey = yield km.decrypt(
443             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
444         self.assertEqual(self.RAW_DATA, rawdata)
445         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
446                                fetch_remote=False)
447         self.assertEqual(signingkey.fingerprint, key.fingerprint)
448
449     @defer.inlineCallbacks
450     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
451         km = self._key_manager()
452         # put raw keys
453         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
454         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
455             PRIVATE_KEY_2, ADDRESS_2)
456         # encrypt
457         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
458                                    sign=ADDRESS_2, fetch_remote=False)
459         self.assertNotEqual(self.RAW_DATA, encdata)
460         # verify
461         rawdata, signingkey = yield km.decrypt(
462             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
463         self.assertEqual(self.RAW_DATA, rawdata)
464         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
465
466     @defer.inlineCallbacks
467     def test_keymanager_openpgp_sign_verify(self):
468         km = self._key_manager()
469         # put raw private keys
470         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
471         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
472                                  detach=False)
473         self.assertNotEqual(self.RAW_DATA, signdata)
474         # verify
475         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
476                                      fetch_remote=False)
477         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
478                                fetch_remote=False)
479         self.assertEqual(signingkey.fingerprint, key.fingerprint)
480
481     def test_keymanager_encrypt_key_not_found(self):
482         km = self._key_manager()
483         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
484         d.addCallback(
485             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
486                                  sign=ADDRESS, fetch_remote=False))
487         return self.assertFailure(d, KeyNotFound)
488
489 if __name__ == "__main__":
490     import unittest
491     unittest.main()
492
493 # key 0F91B402: someone@somedomain.org
494 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
495 ADDRESS_OTHER = "someone@somedomain.org"
496 PUBLIC_KEY_OTHER = """
497 -----BEGIN PGP PUBLIC KEY BLOCK-----
498 Version: GnuPG v1
499
500 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
501 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
502 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
503 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
504 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
505 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
506 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
507 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
508 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
509 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
510 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
511 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
512 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
513 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
514 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
515 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
516 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
517 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
518 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
519 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
520 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
521 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
522 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
523 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
524 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
525 =gDzy
526 -----END PGP PUBLIC KEY BLOCK-----
527 """