Do not raise on not 2XX error codes
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import logging
25 import os
26 import re
27 import shutil
28 import tempfile
29
30
31 from leap.common.check import leap_assert, leap_assert_type
32 from leap.keymanager import errors
33 from leap.keymanager.keys import (
34     EncryptionKey,
35     EncryptionScheme,
36     is_address,
37     build_key_from_dict,
38     KEYMANAGER_KEY_TAG,
39     TAGS_ADDRESS_PRIVATE_INDEX,
40 )
41 from leap.keymanager.gpg import GPGWrapper
42
43
44 logger = logging.getLogger(__name__)
45
46
47 class TempGPGWrapper(object):
48     """
49     A context manager returning a temporary GPG wrapper keyring, which
50     contains exactly zero or one pubkeys, and zero or one privkeys.
51
52     Temporary unitary keyrings allow the to use GPG's facilities for exactly
53     one key. This function creates an empty temporary keyring and imports
54     C{keys} if it is not None.
55     """
56     def __init__(self, keys=None, gpgbinary=None):
57         """
58         :param keys: OpenPGP key, or list of.
59         :type keys: OpenPGPKey or list of OpenPGPKeys
60         :param gpgbinary: Name for GnuPG binary executable.
61         :type gpgbinary: C{str}
62         """
63         self._gpg = None
64         self._gpgbinary = gpgbinary
65         if not keys:
66             keys = list()
67         if not isinstance(keys, list):
68             keys = [keys]
69         self._keys = keys
70         for key in filter(None, keys):
71             leap_assert_type(key, OpenPGPKey)
72
73     def __enter__(self):
74         """
75         Calls the unitary gpgwrapper initializer
76
77         :return: A GPG wrapper with a unitary keyring.
78         :rtype: gnupg.GPG
79         """
80         self._build_keyring()
81         return self._gpg
82
83     def __exit__(self, exc_type, exc_value, traceback):
84         """
85         Ensures the gpgwrapper is properly destroyed.
86         """
87         # TODO handle exceptions and log here
88         self._destroy_keyring()
89
90     def _build_keyring(self):
91         """
92         Create an empty GPG keyring and import C{keys} into it.
93
94         :param keys: List of keys to add to the keyring.
95         :type keys: list of OpenPGPKey
96
97         :return: A GPG wrapper with a unitary keyring.
98         :rtype: gnupg.GPG
99         """
100         privkeys = [key for key in self._keys if key and key.private is True]
101         publkeys = [key for key in self._keys if key and key.private is False]
102         # here we filter out public keys that have a correspondent
103         # private key in the list because the private key_data by
104         # itself is enough to also have the public key in the keyring,
105         # and we want to count the keys afterwards.
106
107         privaddrs = map(lambda privkey: privkey.address, privkeys)
108         publkeys = filter(
109             lambda pubkey: pubkey.address not in privaddrs, publkeys)
110
111         listkeys = lambda: self._gpg.list_keys()
112         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
113
114         self._gpg = GPGWrapper(
115             gnupghome=tempfile.mkdtemp(),
116             gpgbinary=self._gpgbinary)
117         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
118
119         # import keys into the keyring:
120         # concatenating ascii-armored keys, which is correctly
121         # understood by the GPGWrapper.
122
123         self._gpg.import_keys("".join(
124             [x.key_data for x in publkeys + privkeys]))
125
126         # assert the number of keys in the keyring
127         leap_assert(
128             len(listkeys()) == len(publkeys) + len(privkeys),
129             'Wrong number of public keys in keyring: %d, should be %d)' %
130             (len(listkeys()), len(publkeys) + len(privkeys)))
131         leap_assert(
132             len(listsecretkeys()) == len(privkeys),
133             'Wrong number of private keys in keyring: %d, should be %d)' %
134             (len(listsecretkeys()), len(privkeys)))
135
136     def _destroy_keyring(self):
137         """
138         Securely erase a unitary keyring.
139         """
140         # TODO: implement some kind of wiping of data or a more
141         # secure way that
142         # does not write to disk.
143
144         try:
145             for secret in [True, False]:
146                 for key in self._gpg.list_keys(secret=secret):
147                     self._gpg.delete_keys(
148                         key['fingerprint'],
149                         secret=secret)
150             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
151
152         except:
153             raise
154
155         finally:
156             leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
157                         "watch out! Tried to remove default gnupg home!")
158             shutil.rmtree(self._gpg.gnupghome)
159
160
161 def _build_key_from_gpg(address, key, key_data):
162     """
163     Build an OpenPGPKey for C{address} based on C{key} from
164     local gpg storage.
165
166     ASCII armored GPG key data has to be queried independently in this
167     wrapper, so we receive it in C{key_data}.
168
169     :param address: The address bound to the key.
170     :type address: str
171     :param key: Key obtained from GPG storage.
172     :type key: dict
173     :param key_data: Key data obtained from GPG storage.
174     :type key_data: str
175     :return: An instance of the key.
176     :rtype: OpenPGPKey
177     """
178     return OpenPGPKey(
179         address,
180         key_id=key['keyid'],
181         fingerprint=key['fingerprint'],
182         key_data=key_data,
183         private=True if key['type'] == 'sec' else False,
184         length=key['length'],
185         expiry_date=key['expires'],
186         validation=None,  # TODO: verify for validation.
187     )
188
189
190 #
191 # The OpenPGP wrapper
192 #
193
194 class OpenPGPKey(EncryptionKey):
195     """
196     Base class for OpenPGP keys.
197     """
198
199
200 class OpenPGPScheme(EncryptionScheme):
201     """
202     A wrapper for OpenPGP keys management and use (encryption, decyption,
203     signing and verification).
204     """
205
206     def __init__(self, soledad, gpgbinary=None):
207         """
208         Initialize the OpenPGP wrapper.
209
210         :param soledad: A Soledad instance for key storage.
211         :type soledad: leap.soledad.Soledad
212         :param gpgbinary: Name for GnuPG binary executable.
213         :type gpgbinary: C{str}
214         """
215         EncryptionScheme.__init__(self, soledad)
216         self._gpgbinary = gpgbinary
217
218     #
219     # Keys management
220     #
221
222     def gen_key(self, address):
223         """
224         Generate an OpenPGP keypair bound to C{address}.
225
226         :param address: The address bound to the key.
227         :type address: str
228         :return: The key bound to C{address}.
229         :rtype: OpenPGPKey
230         @raise KeyAlreadyExists: If key already exists in local database.
231         """
232         # make sure the key does not already exist
233         leap_assert(is_address(address), 'Not an user address: %s' % address)
234         try:
235             self.get_key(address)
236             raise errors.KeyAlreadyExists(address)
237         except errors.KeyNotFound:
238             logger.debug('Key for %s not found' % (address,))
239
240         with self._temporary_gpgwrapper() as gpg:
241             # TODO: inspect result, or use decorator
242             params = gpg.gen_key_input(
243                 key_type='RSA',
244                 key_length=4096,
245                 name_real=address,
246                 name_email=address,
247                 name_comment='Generated by LEAP Key Manager.')
248             logger.info("About to generate keys... This might take SOME time.")
249             gpg.gen_key(params)
250             logger.info("Keys for %s have been successfully "
251                         "generated." % (address,))
252             pubkeys = gpg.list_keys()
253
254             # assert for new key characteristics
255
256             # XXX This exception is not properly catched by the soledad
257             # bootstrapping, so if we do not finish generating the keys
258             # we end with a blocked thread -- kali
259
260             leap_assert(
261                 len(pubkeys) is 1,  # a unitary keyring!
262                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
263             key = gpg.list_keys(secret=True).pop()
264             leap_assert(
265                 len(key['uids']) is 1,  # with just one uid!
266                 'Wrong number of uids for key: %d.' % len(key['uids']))
267             leap_assert(
268                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
269                 'Key not correctly bound to address.')
270             # insert both public and private keys in storage
271             for secret in [True, False]:
272                 key = gpg.list_keys(secret=secret).pop()
273                 openpgp_key = _build_key_from_gpg(
274                     address, key,
275                     gpg.export_keys(key['fingerprint'], secret=secret))
276                 self.put_key(openpgp_key)
277
278         return self.get_key(address, private=True)
279
280     def get_key(self, address, private=False):
281         """
282         Get key bound to C{address} from local storage.
283
284         :param address: The address bound to the key.
285         :type address: str
286         :param private: Look for a private key instead of a public one?
287         :type private: bool
288
289         :return: The key bound to C{address}.
290         :rtype: OpenPGPKey
291         @raise KeyNotFound: If the key was not found on local storage.
292         """
293         leap_assert(is_address(address), 'Not an user address: %s' % address)
294         doc = self._get_key_doc(address, private)
295         if doc is None:
296             raise errors.KeyNotFound(address)
297         return build_key_from_dict(OpenPGPKey, address, doc.content)
298
299     def put_ascii_key(self, key_data):
300         """
301         Put key contained in ascii-armored C{key_data} in local storage.
302
303         :param key_data: The key data to be stored.
304         :type key_data: str or unicode
305         """
306         leap_assert_type(key_data, (str, unicode))
307         # TODO: add more checks for correct key data.
308         leap_assert(key_data is not None, 'Data does not represent a key.')
309
310         with self._temporary_gpgwrapper() as gpg:
311             # TODO: inspect result, or use decorator
312             gpg.import_keys(key_data)
313             privkey = None
314             pubkey = None
315
316             try:
317                 privkey = gpg.list_keys(secret=True).pop()
318             except IndexError:
319                 pass
320             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
321             # extract adress from first uid on key
322             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
323             leap_assert(match is not None, 'No user address in key data.')
324             address = match.group(1)
325             if privkey is not None:
326                 match = re.match(
327                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
328                 leap_assert(match is not None, 'No user address in key data.')
329                 privaddress = match.group(1)
330                 leap_assert(
331                     address == privaddress,
332                     'Addresses in pub and priv key differ.')
333                 leap_assert(
334                     pubkey['fingerprint'] == privkey['fingerprint'],
335                     'Fingerprints for pub and priv key differ.')
336                 # insert private key in storage
337                 openpgp_privkey = _build_key_from_gpg(
338                     address, privkey,
339                     gpg.export_keys(privkey['fingerprint'], secret=True))
340                 self.put_key(openpgp_privkey)
341             # insert public key in storage
342             openpgp_pubkey = _build_key_from_gpg(
343                 address, pubkey,
344                 gpg.export_keys(pubkey['fingerprint'], secret=False))
345             self.put_key(openpgp_pubkey)
346
347     def put_key(self, key):
348         """
349         Put C{key} in local storage.
350
351         :param key: The key to be stored.
352         :type key: OpenPGPKey
353         """
354         doc = self._get_key_doc(key.address, private=key.private)
355         if doc is None:
356             self._soledad.create_doc_from_json(key.get_json())
357         else:
358             doc.set_json(key.get_json())
359             self._soledad.put_doc(doc)
360
361     def _get_key_doc(self, address, private=False):
362         """
363         Get the document with a key (public, by default) bound to C{address}.
364
365         If C{private} is True, looks for a private key instead of a public.
366
367         :param address: The address bound to the key.
368         :type address: str
369         :param private: Whether to look for a private key.
370         :type private: bool
371         :return: The document with the key or None if it does not exist.
372         :rtype: leap.soledad.document.SoledadDocument
373         """
374         doclist = self._soledad.get_from_index(
375             TAGS_ADDRESS_PRIVATE_INDEX,
376             KEYMANAGER_KEY_TAG,
377             address,
378             '1' if private else '0')
379         if len(doclist) is 0:
380             return None
381         leap_assert(
382             len(doclist) is 1,
383             'Found more than one %s key for address!' %
384             'private' if private else 'public')
385         return doclist.pop()
386
387     def delete_key(self, key):
388         """
389         Remove C{key} from storage.
390
391         :param key: The key to be removed.
392         :type key: EncryptionKey
393         """
394         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
395         stored_key = self.get_key(key.address, private=key.private)
396         if stored_key is None:
397             raise errors.KeyNotFound(key)
398         if stored_key.__dict__ != key.__dict__:
399             raise errors.KeyAttributesDiffer(key)
400         doc = self._get_key_doc(key.address, key.private)
401         self._soledad.delete_doc(doc)
402
403     #
404     # Data encryption, decryption, signing and verifying
405     #
406
407     def _temporary_gpgwrapper(self, keys=None):
408         """
409         Returns a unitary gpg wrapper that implements context manager
410         protocol.
411
412         :param key_data: ASCII armored key data.
413         :type key_data: str
414         :param gpgbinary: Name for GnuPG binary executable.
415         :type gpgbinary: C{str}
416
417         :return: a GPGWrapper instance
418         :rtype: GPGWrapper
419         """
420         # TODO do here checks on key_data
421         return TempGPGWrapper(
422             keys=keys, gpgbinary=self._gpgbinary)
423
424     @staticmethod
425     def _assert_gpg_result_ok(result):
426         """
427         Check if GPG result is 'ok' and log stderr outputs.
428         :param result: The GPG results
429         :type result:
430         """
431         stderr = getattr(result, 'stderr', None)
432         if stderr:
433             logger.debug("%s" % (stderr,))
434         if getattr(result, 'ok', None) is not True:
435             raise errors.EncryptionDecryptionFailed(
436                 'Failed to encrypt/decrypt: %s' % stderr)
437
438     def encrypt(self, data, pubkey, passphrase=None, sign=None):
439         """
440         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
441
442         :param data: The data to be encrypted.
443         :type data: str
444         :param pubkey: The key used to encrypt.
445         :type pubkey: OpenPGPKey
446         :param sign: The key used for signing.
447         :type sign: OpenPGPKey
448
449         :return: The encrypted data.
450         :rtype: str
451         """
452         leap_assert_type(pubkey, OpenPGPKey)
453         leap_assert(pubkey.private is False, 'Key is not public.')
454         keys = [pubkey]
455         if sign is not None:
456             leap_assert_type(sign, OpenPGPKey)
457             leap_assert(sign.private is True)
458             keys.append(sign)
459         with self._temporary_gpgwrapper(keys) as gpg:
460             result = gpg.encrypt(
461                 data, pubkey.fingerprint,
462                 sign=sign.key_id if sign else None,
463                 passphrase=passphrase, symmetric=False)
464             # Here we cannot assert for correctness of sig because the sig is
465             # in the ciphertext.
466             # result.ok    - (bool) indicates if the operation succeeded
467             # result.data  - (bool) contains the result of the operation
468             self._assert_gpg_result_ok(result)
469             return result.data
470
471     def decrypt(self, data, privkey, passphrase=None, verify=None):
472         """
473         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
474
475         :param data: The data to be decrypted.
476         :type data: str
477         :param privkey: The key used to decrypt.
478         :type privkey: OpenPGPKey
479         :param verify: The key used to verify a signature.
480         :type verify: OpenPGPKey
481
482         :return: The decrypted data.
483         :rtype: str
484
485         @raise InvalidSignature: Raised if unable to verify the signature with
486             C{verify} key.
487         """
488         leap_assert(privkey.private is True, 'Key is not private.')
489         keys = [privkey]
490         if verify is not None:
491             leap_assert_type(verify, OpenPGPKey)
492             leap_assert(verify.private is False)
493             keys.append(verify)
494         with self._temporary_gpgwrapper(keys) as gpg:
495             result = gpg.decrypt(data, passphrase=passphrase)
496             self._assert_gpg_result_ok(result)
497             # verify signature
498             if (verify is not None):
499                 if result.valid is False or \
500                         verify.fingerprint != result.pubkey_fingerprint:
501                     raise errors.InvalidSignature(
502                         'Failed to verify signature with key %s: %s' %
503                         (verify.key_id, stderr))
504             return result.data
505
506     def is_encrypted(self, data):
507         """
508         Return whether C{data} was asymmetrically encrypted using OpenPGP.
509
510         :param data: The data we want to know about.
511         :type data: str
512
513         :return: Whether C{data} was encrypted using this wrapper.
514         :rtype: bool
515         """
516         with self._temporary_gpgwrapper() as gpg:
517             return gpg.is_encrypted_asym(data)
518
519     def sign(self, data, privkey):
520         """
521         Sign C{data} with C{privkey}.
522
523         :param data: The data to be signed.
524         :type data: str
525
526         :param privkey: The private key to be used to sign.
527         :type privkey: OpenPGPKey
528
529         :return: The ascii-armored signed data.
530         :rtype: str
531         """
532         leap_assert_type(privkey, OpenPGPKey)
533         leap_assert(privkey.private is True)
534
535         # result.fingerprint - contains the fingerprint of the key used to
536         #                      sign.
537         with self._temporary_gpgwrapper(privkey) as gpg:
538             result = gpg.sign(data, keyid=privkey.key_id)
539             rfprint = privkey.fingerprint
540             privkey = gpg.list_keys(secret=True).pop()
541             kfprint = privkey['fingerprint']
542             if result.fingerprint is None:
543                 raise errors.SignFailed(
544                     'Failed to sign with key %s: %s' %
545                     (privkey['keyid'], stderr))
546             leap_assert(
547                 result.fingerprint == kfprint,
548                 'Signature and private key fingerprints mismatch: '
549                 '%s != %s' % (rfprint, kfprint))
550         return result.data
551
552     def verify(self, data, pubkey):
553         """
554         Verify signed C{data} with C{pubkey}.
555
556         :param data: The data to be verified.
557         :type data: str
558
559         :param pubkey: The public key to be used on verification.
560         :type pubkey: OpenPGPKey
561
562         :return: The ascii-armored signed data.
563         :rtype: str
564         """
565         leap_assert_type(pubkey, OpenPGPKey)
566         leap_assert(pubkey.private is False)
567         with self._temporary_gpgwrapper(pubkey) as gpg:
568             result = gpg.verify(data)
569             gpgpubkey = gpg.list_keys().pop()
570             valid = result.valid
571             rfprint = result.fingerprint
572             kfprint = gpgpubkey['fingerprint']
573             # raise in case sig is invalid
574             if valid is False or rfprint != kfprint:
575                 raise errors.InvalidSignature(
576                     'Failed to verify signature '
577                     'with key %s.' % gpgpubkey['keyid'])
578             return True