[bug] remove the dependency on enum34
[keymanager.git] / src / leap / keymanager / tests / test_keymanager.py
1 # -*- coding: utf-8 -*-
2 # test_keymanager.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Tests for the Key Manager.
21 """
22
23
24 from datetime import datetime
25 from mock import Mock
26 from twisted.internet.defer import inlineCallbacks
27 from twisted.trial import unittest
28
29 from leap.keymanager import (
30     KeyNotFound,
31     KeyAddressMismatch,
32     errors
33 )
34 from leap.keymanager.openpgp import OpenPGPKey
35 from leap.keymanager.keys import (
36     is_address,
37     build_key_from_dict,
38 )
39 from leap.keymanager.validation import ValidationLevels
40 from leap.keymanager.tests import (
41     KeyManagerWithSoledadTestCase,
42     ADDRESS,
43     ADDRESS_2,
44     KEY_FINGERPRINT,
45     PUBLIC_KEY,
46     PUBLIC_KEY_2,
47     PRIVATE_KEY,
48     PRIVATE_KEY_2,
49 )
50
51
52 NICKSERVER_URI = "http://leap.se/"
53
54
55 class KeyManagerUtilTestCase(unittest.TestCase):
56
57     def test_is_address(self):
58         self.assertTrue(
59             is_address('user@leap.se'),
60             'Incorrect address detection.')
61         self.assertFalse(
62             is_address('userleap.se'),
63             'Incorrect address detection.')
64         self.assertFalse(
65             is_address('user@'),
66             'Incorrect address detection.')
67         self.assertFalse(
68             is_address('@leap.se'),
69             'Incorrect address detection.')
70
71     def test_build_key_from_dict(self):
72         kdict = {
73             'address': [ADDRESS],
74             'key_id': KEY_FINGERPRINT[-16:],
75             'fingerprint': KEY_FINGERPRINT,
76             'key_data': PUBLIC_KEY,
77             'private': False,
78             'length': 4096,
79             'expiry_date': 0,
80             'last_audited_at': 0,
81             'refreshed_at': 1311239602,
82             'validation': str(ValidationLevels.Weak_Chain),
83             'encr_used': False,
84             'sign_used': True,
85         }
86         key = build_key_from_dict(OpenPGPKey, kdict)
87         self.assertEqual(
88             kdict['address'], key.address,
89             'Wrong data in key.')
90         self.assertEqual(
91             kdict['key_id'], key.key_id,
92             'Wrong data in key.')
93         self.assertEqual(
94             kdict['fingerprint'], key.fingerprint,
95             'Wrong data in key.')
96         self.assertEqual(
97             kdict['key_data'], key.key_data,
98             'Wrong data in key.')
99         self.assertEqual(
100             kdict['private'], key.private,
101             'Wrong data in key.')
102         self.assertEqual(
103             kdict['length'], key.length,
104             'Wrong data in key.')
105         self.assertEqual(
106             None, key.expiry_date,
107             'Wrong data in key.')
108         self.assertEqual(
109             None, key.last_audited_at,
110             'Wrong data in key.')
111         self.assertEqual(
112             datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
113             'Wrong data in key.')
114         self.assertEqual(
115             ValidationLevels.get(kdict['validation']), key.validation,
116             'Wrong data in key.')
117         self.assertEqual(
118             kdict['encr_used'], key.encr_used,
119             'Wrong data in key.')
120         self.assertEqual(
121             kdict['sign_used'], key.sign_used,
122             'Wrong data in key.')
123
124
125 class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
126
127     @inlineCallbacks
128     def test_get_all_keys_in_db(self):
129         km = self._key_manager()
130         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
131         # get public keys
132         keys = yield km.get_all_keys(False)
133         self.assertEqual(len(keys), 1, 'Wrong number of keys')
134         self.assertTrue(ADDRESS in keys[0].address)
135         self.assertFalse(keys[0].private)
136         # get private keys
137         keys = yield km.get_all_keys(True)
138         self.assertEqual(len(keys), 1, 'Wrong number of keys')
139         self.assertTrue(ADDRESS in keys[0].address)
140         self.assertTrue(keys[0].private)
141
142     @inlineCallbacks
143     def test_get_public_key(self):
144         km = self._key_manager()
145         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
146         # get the key
147         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
148                                fetch_remote=False)
149         self.assertTrue(key is not None)
150         self.assertTrue(ADDRESS in key.address)
151         self.assertEqual(
152             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
153         self.assertFalse(key.private)
154
155     @inlineCallbacks
156     def test_get_private_key(self):
157         km = self._key_manager()
158         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
159         # get the key
160         key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
161                                fetch_remote=False)
162         self.assertTrue(key is not None)
163         self.assertTrue(ADDRESS in key.address)
164         self.assertEqual(
165             key.fingerprint.lower(), KEY_FINGERPRINT.lower())
166         self.assertTrue(key.private)
167
168     def test_send_key_raises_key_not_found(self):
169         km = self._key_manager()
170         d = km.send_key(OpenPGPKey)
171         return self.assertFailure(d, KeyNotFound)
172
173     @inlineCallbacks
174     def test_send_key(self):
175         """
176         Test that request is well formed when sending keys to server.
177         """
178         token = "mytoken"
179         km = self._key_manager(token=token)
180         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
181         km._fetcher.put = Mock()
182         # the following data will be used on the send
183         km.ca_cert_path = 'capath'
184         km.session_id = 'sessionid'
185         km.uid = 'myuid'
186         km.api_uri = 'apiuri'
187         km.api_version = 'apiver'
188         yield km.send_key(OpenPGPKey)
189         # setup expected args
190         pubkey = yield km.get_key(km._address, OpenPGPKey)
191         data = {
192             km.PUBKEY_KEY: pubkey.key_data,
193         }
194         url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
195         km._fetcher.put.assert_called_once_with(
196             url, data=data, verify='capath',
197             headers={'Authorization': 'Token token=%s' % token},
198         )
199
200     def test_fetch_keys_from_server(self):
201         """
202         Test that the request is well formed when fetching keys from server.
203         """
204         km = self._key_manager(url=NICKSERVER_URI)
205
206         def verify_the_call(_):
207             km._fetcher.get.assert_called_once_with(
208                 NICKSERVER_URI,
209                 data={'address': ADDRESS_2},
210                 verify='cacertpath',
211             )
212
213         d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
214         d.addCallback(verify_the_call)
215         return d
216
217     @inlineCallbacks
218     def test_get_key_fetches_from_server(self):
219         """
220         Test that getting a key successfuly fetches from server.
221         """
222         km = self._key_manager(url=NICKSERVER_URI)
223
224         key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
225         self.assertIsInstance(key, OpenPGPKey)
226         self.assertTrue(ADDRESS in key.address)
227         self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
228
229     @inlineCallbacks
230     def test_get_key_fetches_other_domain(self):
231         """
232         Test that getting a key successfuly fetches from server.
233         """
234         km = self._key_manager(url=NICKSERVER_URI)
235
236         key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
237         self.assertIsInstance(key, OpenPGPKey)
238         self.assertTrue(ADDRESS_OTHER in key.address)
239         self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
240
241     def _fetch_key(self, km, address, key):
242         """
243         :returns: a Deferred that will fire with the OpenPGPKey
244         """
245         class Response(object):
246             status_code = 200
247             headers = {'content-type': 'application/json'}
248
249             def json(self):
250                 return {'address': address, 'openpgp': key}
251
252             def raise_for_status(self):
253                 pass
254
255         # mock the fetcher so it returns the key for ADDRESS_2
256         km._fetcher.get = Mock(return_value=Response())
257         km.ca_cert_path = 'cacertpath'
258         # try to key get without fetching from server
259         d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
260         d = self.assertFailure(d_fail, KeyNotFound)
261         # try to get key fetching from server.
262         d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
263         return d
264
265     @inlineCallbacks
266     def test_put_key_ascii(self):
267         """
268         Test that putting ascii key works
269         """
270         km = self._key_manager(url=NICKSERVER_URI)
271
272         yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
273         key = yield km.get_key(ADDRESS, OpenPGPKey)
274         self.assertIsInstance(key, OpenPGPKey)
275         self.assertTrue(ADDRESS in key.address)
276
277     @inlineCallbacks
278     def test_fetch_uri_ascii_key(self):
279         """
280         Test that fetch key downloads the ascii key and gets included in
281         the local storage
282         """
283         km = self._key_manager()
284
285         class Response(object):
286             ok = True
287             content = PUBLIC_KEY
288
289         km._fetcher.get = Mock(return_value=Response())
290         km.ca_cert_path = 'cacertpath'
291
292         yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
293         key = yield km.get_key(ADDRESS, OpenPGPKey)
294         self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
295
296     def test_fetch_uri_empty_key(self):
297         """
298         Test that fetch key raises KeyNotFound if no key in the url
299         """
300         km = self._key_manager()
301
302         class Response(object):
303             ok = True
304             content = ""
305
306         km._fetcher.get = Mock(return_value=Response())
307         km.ca_cert_path = 'cacertpath'
308         d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
309         return self.assertFailure(d, KeyNotFound)
310
311     def test_fetch_uri_address_differ(self):
312         """
313         Test that fetch key raises KeyAttributesDiffer if the address
314         don't match
315         """
316         km = self._key_manager()
317
318         class Response(object):
319             ok = True
320             content = PUBLIC_KEY
321
322         km._fetcher.get = Mock(return_value=Response())
323         km.ca_cert_path = 'cacertpath'
324         d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
325         return self.assertFailure(d, KeyAddressMismatch)
326
327
328 class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
329
330     RAW_DATA = 'data'
331
332     @inlineCallbacks
333     def test_keymanager_openpgp_encrypt_decrypt(self):
334         km = self._key_manager()
335         # put raw private key
336         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
337         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
338             PRIVATE_KEY_2, ADDRESS_2)
339         # encrypt
340         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
341                                    sign=ADDRESS_2, fetch_remote=False)
342         self.assertNotEqual(self.RAW_DATA, encdata)
343         # decrypt
344         rawdata, signingkey = yield km.decrypt(
345             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
346         self.assertEqual(self.RAW_DATA, rawdata)
347         key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
348                                fetch_remote=False)
349         self.assertEqual(signingkey.fingerprint, key.fingerprint)
350
351     @inlineCallbacks
352     def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
353         km = self._key_manager()
354         # put raw keys
355         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
356         yield km._wrapper_map[OpenPGPKey].put_ascii_key(
357             PRIVATE_KEY_2, ADDRESS_2)
358         # encrypt
359         encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
360                                    sign=ADDRESS_2, fetch_remote=False)
361         self.assertNotEqual(self.RAW_DATA, encdata)
362         # verify
363         rawdata, signingkey = yield km.decrypt(
364             encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
365         self.assertEqual(self.RAW_DATA, rawdata)
366         self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
367
368     @inlineCallbacks
369     def test_keymanager_openpgp_sign_verify(self):
370         km = self._key_manager()
371         # put raw private keys
372         yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
373         signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
374                                  detach=False)
375         self.assertNotEqual(self.RAW_DATA, signdata)
376         # verify
377         signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
378                                      fetch_remote=False)
379         key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
380                                fetch_remote=False)
381         self.assertEqual(signingkey.fingerprint, key.fingerprint)
382
383     def test_keymanager_encrypt_key_not_found(self):
384         km = self._key_manager()
385         d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
386         d.addCallback(
387             lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
388                                  sign=ADDRESS, fetch_remote=False))
389         return self.assertFailure(d, KeyNotFound)
390
391
392 import unittest
393 if __name__ == "__main__":
394     unittest.main()
395
396 # key 0F91B402: someone@somedomain.org
397 # 9420 EC7B 6DCB 867F 5592  E6D1 7504 C974 0F91 B402
398 ADDRESS_OTHER = "someone@somedomain.org"
399 PUBLIC_KEY_OTHER = """
400 -----BEGIN PGP PUBLIC KEY BLOCK-----
401 Version: GnuPG v1
402
403 mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
404 kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
405 jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
406 wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
407 QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
408 AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
409 bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
410 CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
411 9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
412 cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
413 TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
414 Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
415 hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
416 BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
417 u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
418 MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
419 2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
420 SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
421 1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
422 CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
423 DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
424 6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
425 U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
426 XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
427 QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
428 =gDzy
429 -----END PGP PUBLIC KEY BLOCK-----
430 """