[style] fix pep8 problems
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23
24 from datetime import datetime
25 import tempfile
26 from leap.common import ca_bundle
27 from mock import Mock, MagicMock, patch
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.trial import unittest
30
31 from leap.keymanager import (
32     KeyNotFound,
33     KeyAddressMismatch,
34     errors
35 )
36 from leap.keymanager.openpgp import OpenPGPKey
37 from leap.keymanager.keys import (
38     is_address,
39     build_key_from_dict,
40 )
41 from leap.keymanager.validation import ValidationLevels
42 from leap.keymanager.tests import (
43     KeyManagerWithSoledadTestCase,
44     ADDRESS,
45     ADDRESS_2,
46     KEY_FINGERPRINT,
47     PUBLIC_KEY,
48     PUBLIC_KEY_2,
49     PRIVATE_KEY,
50     PRIVATE_KEY_2,
51 )
52
53
54 NICKSERVER_URI = "http://leap.se/"
55 REMOTE_KEY_URL = "http://site.domain/key"
56
57
58 class KeyManagerUtilTestCase(unittest.TestCase):
59
60     def test_is_address(self):
61         self.assertTrue(
62             is_address('user@leap.se'),
63             'Incorrect address detection.')
64         self.assertFalse(
65             is_address('userleap.se'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('user@'),
69             'Incorrect address detection.')
70         self.assertFalse(
71             is_address('@leap.se'),
72             'Incorrect address detection.')
73
74     def test_build_key_from_dict(self):
75         kdict = {
76             'address': [ADDRESS],
77             'key_id': KEY_FINGERPRINT[-16:],
78             'fingerprint': KEY_FINGERPRINT,
79             'key_data': PUBLIC_KEY,
80             'private': False,
81             'length': 4096,
82             'expiry_date': 0,
83             'last_audited_at': 0,
84             'refreshed_at': 1311239602,
85             'validation': str(ValidationLevels.Weak_Chain),
86             'encr_used': False,
87             'sign_used': True,
88         }
89         key = build_key_from_dict(OpenPGPKey, kdict)
90         self.assertEqual(
91             kdict['address'], key.address,
92             'Wrong data in key.')
93         self.assertEqual(
94             kdict['key_id'], key.key_id,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['fingerprint'], key.fingerprint,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['key_data'], key.key_data,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['private'], key.private,
104             'Wrong data in key.')
105         self.assertEqual(
106             kdict['length'], key.length,
107             'Wrong data in key.')
108         self.assertEqual(
109             None, key.expiry_date,
110             'Wrong data in key.')
111         self.assertEqual(
112             None, key.last_audited_at,
113             'Wrong data in key.')
114         self.assertEqual(
115             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
116             'Wrong data in key.')
117         self.assertEqual(
118             ValidationLevels.get(kdict['validation']), key.validation,
119             'Wrong data in key.')
120         self.assertEqual(
121             kdict['encr_used'], key.encr_used,
122             'Wrong data in key.')
123         self.assertEqual(
124             kdict['sign_used'], key.sign_used,
125             'Wrong data in key.')
126
127
128 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
129
130     @inlineCallbacks
131     def test_get_all_keys_in_db(self):
132         km = self._key_manager()
133         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
134         # get public keys
135         keys = yield km.get_all_keys(False)
136         self.assertEqual(len(keys), 1, 'Wrong number of keys')
137         self.assertTrue(ADDRESS in keys[0].address)
138         self.assertFalse(keys[0].private)
139         # get private keys
140         keys = yield km.get_all_keys(True)
141         self.assertEqual(len(keys), 1, 'Wrong number of keys')
142         self.assertTrue(ADDRESS in keys[0].address)
143         self.assertTrue(keys[0].private)
144
145     @inlineCallbacks
146     def test_get_public_key(self):
147         km = self._key_manager()
148         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
149         # get the key
150         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
151                                fetch_remote=False)
152         self.assertTrue(key is not None)
153         self.assertTrue(ADDRESS in key.address)
154         self.assertEqual(
155             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
156         self.assertFalse(key.private)
157
158     @inlineCallbacks
159     def test_get_private_key(self):
160         km = self._key_manager()
161         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
162         # get the key
163         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
164                                fetch_remote=False)
165         self.assertTrue(key is not None)
166         self.assertTrue(ADDRESS in key.address)
167         self.assertEqual(
168             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
169         self.assertTrue(key.private)
170
171     def test_send_key_raises_key_not_found(self):
172         km = self._key_manager()
173         d = km.send_key(OpenPGPKey)
174         return self.assertFailure(d, KeyNotFound)
175
176     @inlineCallbacks
177     def test_send_key(self):
178         """
179         Test that request is well formed when sending keys to server.
180         """
181         token = "mytoken"
182         km = self._key_manager(token=token)
183         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
184         km._fetcher.put = Mock()
185         # the following data will be used on the send
186         km.ca_cert_path = 'capath'
187         km.session_id = 'sessionid'
188         km.uid = 'myuid'
189         km.api_uri = 'apiuri'
190         km.api_version = 'apiver'
191         yield km.send_key(OpenPGPKey)
192         # setup expected args
193         pubkey = yield km.get_key(km._address, OpenPGPKey)
194         data = {
195             km.PUBKEY_KEY: pubkey.key_data,
196         }
197         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
198         km._fetcher.put.assert_called_once_with(
199             url, data=data, verify='capath',
200             headers={'Authorization': 'Token token=%s' % token},
201         )
202
203     def test_fetch_keys_from_server(self):
204         """
205         Test that the request is well formed when fetching keys from server.
206         """
207         km = self._key_manager(url=NICKSERVER_URI)
208
209         def verify_the_call(_):
210             km._fetcher.get.assert_called_once_with(
211                 NICKSERVER_URI,
212                 data={'address': ADDRESS_2},
213                 verify='cacertpath',
214             )
215
216         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
217         d.addCallback(verify_the_call)
218         return d
219
220     @inlineCallbacks
221     def test_get_key_fetches_from_server(self):
222         """
223         Test that getting a key successfuly fetches from server.
224         """
225         km = self._key_manager(url=NICKSERVER_URI)
226
227         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
228         self.assertIsInstance(key, OpenPGPKey)
229         self.assertTrue(ADDRESS in key.address)
230         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
231
232     @inlineCallbacks
233     def test_get_key_fetches_other_domain(self):
234         """
235         Test that getting a key successfuly fetches from server.
236         """
237         km = self._key_manager(url=NICKSERVER_URI)
238
239         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
240         self.assertIsInstance(key, OpenPGPKey)
241         self.assertTrue(ADDRESS_OTHER in key.address)
242         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
243
244     def _fetch_key(self, km, address, key):
245         """
246         :returns: a Deferred that will fire with the OpenPGPKey
247         """
248         class Response(object):
249             status_code = 200
250             headers = {'content-type': 'application/json'}
251
252             def json(self):
253                 return {'address': address, 'openpgp': key}
254
255             def raise_for_status(self):
256                 pass
257
258         # mock the fetcher so it returns the key for ADDRESS_2
259         km._fetcher.get = Mock(return_value=Response())
260         km.ca_cert_path = 'cacertpath'
261         # try to key get without fetching from server
262         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
263         d = self.assertFailure(d_fail, KeyNotFound)
264         # try to get key fetching from server.
265         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
266         return d
267
268     @inlineCallbacks
269     def test_put_key_ascii(self):
270         """
271         Test that putting ascii key works
272         """
273         km = self._key_manager(url=NICKSERVER_URI)
274
275         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
276         key = yield km.get_key(ADDRESS, OpenPGPKey)
277         self.assertIsInstance(key, OpenPGPKey)
278         self.assertTrue(ADDRESS in key.address)
279
280     @inlineCallbacks
281     def test_fetch_uri_ascii_key(self):
282         """
283         Test that fetch key downloads the ascii key and gets included in
284         the local storage
285         """
286         km = self._key_manager()
287
288         class Response(object):
289             ok = True
290             content = PUBLIC_KEY
291
292         km._fetcher.get = Mock(return_value=Response())
293
294         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
295         key = yield km.get_key(ADDRESS, OpenPGPKey)
296         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
297
298     def test_fetch_uri_empty_key(self):
299         """
300         Test that fetch key raises KeyNotFound if no key in the url
301         """
302         km = self._key_manager()
303
304         class Response(object):
305             ok = True
306             content = ""
307
308         km._fetcher.get = Mock(return_value=Response())
309         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
310         return self.assertFailure(d, KeyNotFound)
311
312     def test_fetch_uri_address_differ(self):
313         """
314         Test that fetch key raises KeyAttributesDiffer if the address
315         don't match
316         """
317         km = self._key_manager()
318
319         class Response(object):
320             ok = True
321             content = PUBLIC_KEY
322
323         km._fetcher.get = Mock(return_value=Response())
324         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
325         return self.assertFailure(d, KeyAddressMismatch)
326
327     def _mock_get_response(self, km, body):
328         class Response(object):
329             ok = True
330             content = body
331
332         mock = MagicMock(return_value=Response())
333         km._fetcher.get = mock
334
335         return mock
336
337     @inlineCallbacks
338     def test_fetch_key_uses_ca_bundle_if_none_specified(self):
339         ca_cert_path = None
340         km = self._key_manager(ca_cert_path=ca_cert_path)
341         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
342
343         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
344
345         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
346                                          verify=ca_bundle.where())
347
348     @inlineCallbacks
349     def test_fetch_key_uses_default_ca_bundle_if_also_set_as_ca_cert(self):
350         ca_cert_path = ca_bundle.where()
351         km = self._key_manager(ca_cert_path=ca_cert_path)
352         get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
353
354         yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
355
356         get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
357                                          verify=ca_bundle.where())
358
359     @inlineCallbacks
360     def test_fetch_uses_combined_ca_bundle_otherwise(self):
361         with tempfile.NamedTemporaryFile() as tmp_input, \
362                 tempfile.NamedTemporaryFile() as tmp_output:
363             ca_content = 'some\ncontent\n'
364             ca_cert_path = tmp_input.name
365             self._dump_to_file(ca_cert_path, ca_content)
366
367             with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
368                 mock.return_value = tmp_output
369                 km = self._key_manager(ca_cert_path=ca_cert_path)
370                 get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
371
372                 yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
373
374                 # assert that combined bundle file is passed to get call
375                 get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
376                                                  verify=tmp_output.name)
377
378                 # assert that files got appended
379                 expected = self._slurp_file(ca_bundle.where()) + ca_content
380                 self.assertEqual(expected, self._slurp_file(tmp_output.name))
381
382     def _dump_to_file(self, filename, content):
383             with open(filename, 'w') as out:
384                 out.write(content)
385
386     def _slurp_file(self, filename):
387         with open(filename) as f:
388             content = f.read()
389         return content
390
391
392 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
393
394     RAW_DATA = 'data'
395
396     @inlineCallbacks
397     def test_keymanager_openpgp_encrypt_decrypt(self):
398         km = self._key_manager()
399         # put raw private key
400         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
401         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
402             PRIVATE_KEY_2, ADDRESS_2)
403         # encrypt
404         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
405                                    sign=ADDRESS_2, fetch_remote=False)
406         self.assertNotEqual(self.RAW_DATA, encdata)
407         # decrypt
408         rawdata, signingkey = yield km.decrypt(
409             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
410         self.assertEqual(self.RAW_DATA, rawdata)
411         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
412                                fetch_remote=False)
413         self.assertEqual(signingkey.fingerprint, key.fingerprint)
414
415     @inlineCallbacks
416     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
417         km = self._key_manager()
418         # put raw keys
419         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
420         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
421             PRIVATE_KEY_2, ADDRESS_2)
422         # encrypt
423         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
424                                    sign=ADDRESS_2, fetch_remote=False)
425         self.assertNotEqual(self.RAW_DATA, encdata)
426         # verify
427         rawdata, signingkey = yield km.decrypt(
428             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
429         self.assertEqual(self.RAW_DATA, rawdata)
430         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
431
432     @inlineCallbacks
433     def test_keymanager_openpgp_sign_verify(self):
434         km = self._key_manager()
435         # put raw private keys
436         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
437         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
438                                  detach=False)
439         self.assertNotEqual(self.RAW_DATA, signdata)
440         # verify
441         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
442                                      fetch_remote=False)
443         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
444                                fetch_remote=False)
445         self.assertEqual(signingkey.fingerprint, key.fingerprint)
446
447     def test_keymanager_encrypt_key_not_found(self):
448         km = self._key_manager()
449         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
450         d.addCallback(
451             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
452                                  sign=ADDRESS, fetch_remote=False))
453         return self.assertFailure(d, KeyNotFound)
454
455 if __name__ == "__main__":
456     import unittest
457     unittest.main()
458
459 # key 0F91B402: someone@somedomain.org
460 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
461 ADDRESS_OTHER = "someone@somedomain.org"
462 PUBLIC_KEY_OTHER = """
463 -----BEGIN PGP PUBLIC KEY BLOCK-----
464 Version: GnuPG v1
465
466 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
467 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
468 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
469 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
470 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
471 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
472 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
473 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
474 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
475 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
476 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
477 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
478 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
479 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
480 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
481 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
482 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
483 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
484 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
485 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
486 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
487 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
488 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
489 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
490 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
491 =gDzy
492 -----END PGP PUBLIC KEY BLOCK-----
493 """