e84cd297b23101958cb9a4c48e4df73327075043
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from gnupg import GPG
29 from gnupg.gnupg import GPGUtilities
30
31 from leap.common.check import leap_assert, leap_assert_type, leap_check
32 from leap.keymanager import errors
33 from leap.keymanager.keys import (
34     EncryptionKey,
35     EncryptionScheme,
36     is_address,
37     build_key_from_dict,
38     KEYMANAGER_KEY_TAG,
39     TAGS_ADDRESS_PRIVATE_INDEX,
40 )
41 from leap.keymanager.validation import ValidationLevel
42
43
44 logger = logging.getLogger(__name__)
45
46
47 #
48 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
49 #
50
51 class TempGPGWrapper(object):
52     """
53     A context manager that wraps a temporary GPG keyring which only contains
54     the keys given at object creation.
55     """
56
57     def __init__(self, keys=None, gpgbinary=None):
58         """
59         Create an empty temporary keyring and import any given C{keys} into
60         it.
61
62         :param keys: OpenPGP key, or list of.
63         :type keys: OpenPGPKey or list of OpenPGPKeys
64         :param gpgbinary: Name for GnuPG binary executable.
65         :type gpgbinary: C{str}
66         """
67         self._gpg = None
68         self._gpgbinary = gpgbinary
69         if not keys:
70             keys = list()
71         if not isinstance(keys, list):
72             keys = [keys]
73         self._keys = keys
74         for key in keys:
75             leap_assert_type(key, OpenPGPKey)
76
77     def __enter__(self):
78         """
79         Build and return a GPG keyring containing the keys given on
80         object creation.
81
82         :return: A GPG instance containing the keys given on object creation.
83         :rtype: gnupg.GPG
84         """
85         self._build_keyring()
86         return self._gpg
87
88     def __exit__(self, exc_type, exc_value, traceback):
89         """
90         Ensure the gpg is properly destroyed.
91         """
92         # TODO handle exceptions and log here
93         self._destroy_keyring()
94
95     def _build_keyring(self):
96         """
97         Create a GPG keyring containing the keys given on object creation.
98
99         :return: A GPG instance containing the keys given on object creation.
100         :rtype: gnupg.GPG
101         """
102         privkeys = [key for key in self._keys if key and key.private is True]
103         publkeys = [key for key in self._keys if key and key.private is False]
104         # here we filter out public keys that have a correspondent
105         # private key in the list because the private key_data by
106         # itself is enough to also have the public key in the keyring,
107         # and we want to count the keys afterwards.
108
109         privaddrs = map(lambda privkey: privkey.address, privkeys)
110         publkeys = filter(
111             lambda pubkey: pubkey.address not in privaddrs, publkeys)
112
113         listkeys = lambda: self._gpg.list_keys()
114         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
115
116         self._gpg = GPG(binary=self._gpgbinary,
117                         homedir=tempfile.mkdtemp())
118         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
119
120         # import keys into the keyring:
121         # concatenating ascii-armored keys, which is correctly
122         # understood by GPG.
123
124         self._gpg.import_keys("".join(
125             [x.key_data for x in publkeys + privkeys]))
126
127         # assert the number of keys in the keyring
128         leap_assert(
129             len(listkeys()) == len(publkeys) + len(privkeys),
130             'Wrong number of public keys in keyring: %d, should be %d)' %
131             (len(listkeys()), len(publkeys) + len(privkeys)))
132         leap_assert(
133             len(listsecretkeys()) == len(privkeys),
134             'Wrong number of private keys in keyring: %d, should be %d)' %
135             (len(listsecretkeys()), len(privkeys)))
136
137     def _destroy_keyring(self):
138         """
139         Securely erase the keyring.
140         """
141         # TODO: implement some kind of wiping of data or a more
142         # secure way that
143         # does not write to disk.
144
145         try:
146             for secret in [True, False]:
147                 for key in self._gpg.list_keys(secret=secret):
148                     self._gpg.delete_keys(
149                         key['fingerprint'],
150                         secret=secret)
151             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
152
153         except:
154             raise
155
156         finally:
157             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
158                         "watch out! Tried to remove default gnupg home!")
159             shutil.rmtree(self._gpg.homedir)
160
161
162 def _build_key_from_gpg(address, key, key_data):
163     """
164     Build an OpenPGPKey for C{address} based on C{key} from
165     local gpg storage.
166
167     ASCII armored GPG key data has to be queried independently in this
168     wrapper, so we receive it in C{key_data}.
169
170     :param address: The address bound to the key.
171     :type address: str
172     :param key: Key obtained from GPG storage.
173     :type key: dict
174     :param key_data: Key data obtained from GPG storage.
175     :type key_data: str
176     :return: An instance of the key.
177     :rtype: OpenPGPKey
178     """
179     return OpenPGPKey(
180         address,
181         key_id=key['keyid'],
182         fingerprint=key['fingerprint'],
183         key_data=key_data,
184         private=True if key['type'] == 'sec' else False,
185         length=key['length'],
186         expiry_date=key['expires'],
187         validation=ValidationLevel.Weak_Chain,
188     )
189
190
191 #
192 # The OpenPGP wrapper
193 #
194
195 class OpenPGPKey(EncryptionKey):
196     """
197     Base class for OpenPGP keys.
198     """
199
200
201 class OpenPGPScheme(EncryptionScheme):
202     """
203     A wrapper for OpenPGP keys management and use (encryption, decyption,
204     signing and verification).
205     """
206
207     def __init__(self, soledad, gpgbinary=None):
208         """
209         Initialize the OpenPGP wrapper.
210
211         :param soledad: A Soledad instance for key storage.
212         :type soledad: leap.soledad.Soledad
213         :param gpgbinary: Name for GnuPG binary executable.
214         :type gpgbinary: C{str}
215         """
216         EncryptionScheme.__init__(self, soledad)
217         self._gpgbinary = gpgbinary
218
219     #
220     # Keys management
221     #
222
223     def gen_key(self, address):
224         """
225         Generate an OpenPGP keypair bound to C{address}.
226
227         :param address: The address bound to the key.
228         :type address: str
229         :return: The key bound to C{address}.
230         :rtype: OpenPGPKey
231         @raise KeyAlreadyExists: If key already exists in local database.
232         """
233         # make sure the key does not already exist
234         leap_assert(is_address(address), 'Not an user address: %s' % address)
235         try:
236             self.get_key(address)
237             raise errors.KeyAlreadyExists(address)
238         except errors.KeyNotFound:
239             logger.debug('Key for %s not found' % (address,))
240
241         with self._temporary_gpgwrapper() as gpg:
242             # TODO: inspect result, or use decorator
243             params = gpg.gen_key_input(
244                 key_type='RSA',
245                 key_length=4096,
246                 name_real=address,
247                 name_email=address,
248                 name_comment='')
249             logger.info("About to generate keys... This might take SOME time.")
250             gpg.gen_key(params)
251             logger.info("Keys for %s have been successfully "
252                         "generated." % (address,))
253             pubkeys = gpg.list_keys()
254
255             # assert for new key characteristics
256
257             # XXX This exception is not properly catched by the soledad
258             # bootstrapping, so if we do not finish generating the keys
259             # we end with a blocked thread -- kali
260
261             leap_assert(
262                 len(pubkeys) is 1,  # a unitary keyring!
263                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
264             key = gpg.list_keys(secret=True).pop()
265             leap_assert(
266                 len(key['uids']) is 1,  # with just one uid!
267                 'Wrong number of uids for key: %d.' % len(key['uids']))
268             leap_assert(
269                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
270                 'Key not correctly bound to address.')
271             # insert both public and private keys in storage
272             for secret in [True, False]:
273                 key = gpg.list_keys(secret=secret).pop()
274                 openpgp_key = _build_key_from_gpg(
275                     address, key,
276                     gpg.export_keys(key['fingerprint'], secret=secret))
277                 self.put_key(openpgp_key)
278
279         return self.get_key(address, private=True)
280
281     def get_key(self, address, private=False):
282         """
283         Get key bound to C{address} from local storage.
284
285         :param address: The address bound to the key.
286         :type address: str
287         :param private: Look for a private key instead of a public one?
288         :type private: bool
289
290         :return: The key bound to C{address}.
291         :rtype: OpenPGPKey
292         @raise KeyNotFound: If the key was not found on local storage.
293         """
294         # Remove the identity suffix after the '+' until the '@'
295         # e.g.: test_user+something@provider.com becomes test_user@provider.com
296         # since the key belongs to the identity without the '+' suffix.
297         address = re.sub(r'\+.*\@', '@', address)
298
299         doc = self._get_key_doc(address, private)
300         if doc is None:
301             raise errors.KeyNotFound(address)
302         return build_key_from_dict(OpenPGPKey, address, doc.content)
303
304     def parse_ascii_key(self, key_data):
305         """
306         Parses an ascii armored key (or key pair) data and returns
307         the OpenPGPKey keys.
308
309         :param key_data: the key data to be parsed.
310         :type key_data: str or unicode
311
312         :returns: the public key and private key (if applies) for that data.
313         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
314                 the tuple may have one or both components None
315         """
316         leap_assert_type(key_data, (str, unicode))
317         # TODO: add more checks for correct key data.
318         leap_assert(key_data is not None, 'Data does not represent a key.')
319         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
320
321         with self._temporary_gpgwrapper() as gpg:
322             # TODO: inspect result, or use decorator
323             gpg.import_keys(key_data)
324             privkey = None
325             pubkey = None
326
327             try:
328                 privkey = gpg.list_keys(secret=True).pop()
329             except IndexError:
330                 pass
331             try:
332                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
333             except IndexError:
334                 return (None, None)
335
336             # extract adress from first uid on key
337             match = re.match(mail_regex, pubkey['uids'].pop())
338             leap_assert(match is not None, 'No user address in key data.')
339             address = match.group(1)
340
341             openpgp_privkey = None
342             if privkey is not None:
343                 match = re.match(mail_regex, privkey['uids'].pop())
344                 leap_assert(match is not None, 'No user address in key data.')
345                 privaddress = match.group(1)
346
347                 # build private key
348                 openpgp_privkey = _build_key_from_gpg(
349                     privaddress, privkey,
350                     gpg.export_keys(privkey['fingerprint'], secret=True))
351
352                 leap_check(address == privaddress,
353                            'Addresses in public and private key differ.',
354                            errors.KeyAddressMismatch)
355                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
356                            'Fingerprints for public and private key differ.',
357                            errors.KeyFingerprintMismatch)
358
359             # build public key
360             openpgp_pubkey = _build_key_from_gpg(
361                 address, pubkey,
362                 gpg.export_keys(pubkey['fingerprint'], secret=False))
363
364             return (openpgp_pubkey, openpgp_privkey)
365
366     def put_ascii_key(self, key_data):
367         """
368         Put key contained in ascii-armored C{key_data} in local storage.
369
370         :param key_data: The key data to be stored.
371         :type key_data: str or unicode
372         """
373         leap_assert_type(key_data, (str, unicode))
374
375         openpgp_privkey = None
376         try:
377             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
378         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
379             leap_assert(False, repr(e))
380
381         if openpgp_pubkey is not None:
382             self.put_key(openpgp_pubkey)
383         if openpgp_privkey is not None:
384             self.put_key(openpgp_privkey)
385
386     def put_key(self, key):
387         """
388         Put C{key} in local storage.
389
390         :param key: The key to be stored.
391         :type key: OpenPGPKey
392         """
393         doc = self._get_key_doc(key.address, private=key.private)
394         if doc is None:
395             self._soledad.create_doc_from_json(key.get_json())
396         else:
397             doc.set_json(key.get_json())
398             self._soledad.put_doc(doc)
399
400     def _get_key_doc(self, address, private=False):
401         """
402         Get the document with a key (public, by default) bound to C{address}.
403
404         If C{private} is True, looks for a private key instead of a public.
405
406         :param address: The address bound to the key.
407         :type address: str
408         :param private: Whether to look for a private key.
409         :type private: bool
410         :return: The document with the key or None if it does not exist.
411         :rtype: leap.soledad.document.SoledadDocument
412         """
413         doclist = self._soledad.get_from_index(
414             TAGS_ADDRESS_PRIVATE_INDEX,
415             KEYMANAGER_KEY_TAG,
416             address,
417             '1' if private else '0')
418         if len(doclist) is 0:
419             return None
420         leap_assert(
421             len(doclist) is 1,
422             'Found more than one %s key for address!' %
423             'private' if private else 'public')
424         return doclist.pop()
425
426     def delete_key(self, key):
427         """
428         Remove C{key} from storage.
429
430         May raise:
431             errors.KeyNotFound
432             errors.KeyAttributesDiffer
433
434         :param key: The key to be removed.
435         :type key: EncryptionKey
436         """
437         leap_assert_type(key, OpenPGPKey)
438         stored_key = self.get_key(key.address, private=key.private)
439         if stored_key is None:
440             raise errors.KeyNotFound(key)
441         if stored_key.__dict__ != key.__dict__:
442             raise errors.KeyAttributesDiffer(key)
443         doc = self._get_key_doc(key.address, key.private)
444         self._soledad.delete_doc(doc)
445
446     #
447     # Data encryption, decryption, signing and verifying
448     #
449
450     def _temporary_gpgwrapper(self, keys=None):
451         """
452         Return a gpg wrapper that implements the context manager protocol and
453         contains C{keys}.
454
455         :param keys: keys to conform the keyring.
456         :type key: list(OpenPGPKey)
457
458         :return: a TempGPGWrapper instance
459         :rtype: TempGPGWrapper
460         """
461         # TODO do here checks on key_data
462         return TempGPGWrapper(
463             keys=keys, gpgbinary=self._gpgbinary)
464
465     @staticmethod
466     def _assert_gpg_result_ok(result):
467         """
468         Check if GPG result is 'ok' and log stderr outputs.
469
470         :param result: GPG results, which have a field calld 'ok' that states
471                        whether the gpg operation was successful or not.
472         :type result: object
473
474         :raise GPGError: Raised when the gpg operation was not successful.
475         """
476         stderr = getattr(result, 'stderr', None)
477         if stderr:
478             logger.debug("%s" % (stderr,))
479         if getattr(result, 'ok', None) is not True:
480             raise errors.GPGError(
481                 'Failed to encrypt/decrypt: %s' % stderr)
482
483     def encrypt(self, data, pubkey, passphrase=None, sign=None,
484                 cipher_algo='AES256'):
485         """
486         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
487
488         :param data: The data to be encrypted.
489         :type data: str
490         :param pubkey: The key used to encrypt.
491         :type pubkey: OpenPGPKey
492         :param sign: The key used for signing.
493         :type sign: OpenPGPKey
494         :param cipher_algo: The cipher algorithm to use.
495         :type cipher_algo: str
496
497         :return: The encrypted data.
498         :rtype: str
499
500         :raise EncryptError: Raised if failed encrypting for some reason.
501         """
502         leap_assert_type(pubkey, OpenPGPKey)
503         leap_assert(pubkey.private is False, 'Key is not public.')
504         keys = [pubkey]
505         if sign is not None:
506             leap_assert_type(sign, OpenPGPKey)
507             leap_assert(sign.private is True)
508             keys.append(sign)
509         with self._temporary_gpgwrapper(keys) as gpg:
510             result = gpg.encrypt(
511                 data, pubkey.fingerprint,
512                 default_key=sign.key_id if sign else None,
513                 passphrase=passphrase, symmetric=False,
514                 cipher_algo=cipher_algo)
515             # Here we cannot assert for correctness of sig because the sig is
516             # in the ciphertext.
517             # result.ok    - (bool) indicates if the operation succeeded
518             # result.data  - (bool) contains the result of the operation
519             try:
520                 self._assert_gpg_result_ok(result)
521                 return result.data
522             except errors.GPGError as e:
523                 logger.error('Failed to decrypt: %s.' % str(e))
524                 raise errors.EncryptError()
525
526     def decrypt(self, data, privkey, passphrase=None, verify=None):
527         """
528         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
529
530         :param data: The data to be decrypted.
531         :type data: str
532         :param privkey: The key used to decrypt.
533         :type privkey: OpenPGPKey
534         :param passphrase: The passphrase for the secret key used for
535                            decryption.
536         :type passphrase: str
537         :param verify: The key used to verify a signature.
538         :type verify: OpenPGPKey
539
540         :return: The decrypted data.
541         :rtype: unicode
542
543         :raise DecryptError: Raised if failed decrypting for some reason.
544         :raise InvalidSignature: Raised if unable to verify the signature with
545                                  C{verify} key.
546         """
547         leap_assert(privkey.private is True, 'Key is not private.')
548         keys = [privkey]
549         if verify is not None:
550             leap_assert_type(verify, OpenPGPKey)
551             leap_assert(verify.private is False)
552             keys.append(verify)
553         with self._temporary_gpgwrapper(keys) as gpg:
554             try:
555                 result = gpg.decrypt(
556                     data, passphrase=passphrase, always_trust=True)
557                 self._assert_gpg_result_ok(result)
558                 # verify signature
559                 if (verify is not None):
560                     if result.valid is False or \
561                             verify.fingerprint != result.pubkey_fingerprint:
562                         raise errors.InvalidSignature(
563                             'Failed to verify signature with key %s: %s' %
564                             (verify.key_id, result.stderr))
565
566                 return result.data
567             except errors.GPGError as e:
568                 logger.error('Failed to decrypt: %s.' % str(e))
569                 raise errors.DecryptError(str(e))
570
571     def is_encrypted(self, data):
572         """
573         Return whether C{data} was asymmetrically encrypted using OpenPGP.
574
575         :param data: The data we want to know about.
576         :type data: str
577
578         :return: Whether C{data} was encrypted using this wrapper.
579         :rtype: bool
580         """
581         with self._temporary_gpgwrapper() as gpg:
582             gpgutil = GPGUtilities(gpg)
583             return gpgutil.is_encrypted_asym(data)
584
585     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
586              detach=True, binary=False):
587         """
588         Sign C{data} with C{privkey}.
589
590         :param data: The data to be signed.
591         :type data: str
592
593         :param privkey: The private key to be used to sign.
594         :type privkey: OpenPGPKey
595         :param digest_algo: The hash digest to use.
596         :type digest_algo: str
597         :param clearsign: If True, create a cleartext signature.
598         :type clearsign: bool
599         :param detach: If True, create a detached signature.
600         :type detach: bool
601         :param binary: If True, do not ascii armour the output.
602         :type binary: bool
603
604         :return: The ascii-armored signed data.
605         :rtype: str
606         """
607         leap_assert_type(privkey, OpenPGPKey)
608         leap_assert(privkey.private is True)
609
610         # result.fingerprint - contains the fingerprint of the key used to
611         #                      sign.
612         with self._temporary_gpgwrapper(privkey) as gpg:
613             result = gpg.sign(data, default_key=privkey.key_id,
614                               digest_algo=digest_algo, clearsign=clearsign,
615                               detach=detach, binary=binary)
616             rfprint = privkey.fingerprint
617             privkey = gpg.list_keys(secret=True).pop()
618             kfprint = privkey['fingerprint']
619             if result.fingerprint is None:
620                 raise errors.SignFailed(
621                     'Failed to sign with key %s: %s' %
622                     (privkey['keyid'], result.stderr))
623             leap_assert(
624                 result.fingerprint == kfprint,
625                 'Signature and private key fingerprints mismatch: '
626                 '%s != %s' % (rfprint, kfprint))
627         return result.data
628
629     def verify(self, data, pubkey, detached_sig=None):
630         """
631         Verify signed C{data} with C{pubkey}, eventually using
632         C{detached_sig}.
633
634         :param data: The data to be verified.
635         :type data: str
636         :param pubkey: The public key to be used on verification.
637         :type pubkey: OpenPGPKey
638         :param detached_sig: A detached signature. If given, C{data} is
639                              verified against this detached signature.
640         :type detached_sig: str
641
642         :return: The ascii-armored signed data.
643         :rtype: str
644         """
645         leap_assert_type(pubkey, OpenPGPKey)
646         leap_assert(pubkey.private is False)
647         with self._temporary_gpgwrapper(pubkey) as gpg:
648             result = None
649             if detached_sig is None:
650                 result = gpg.verify(data)
651             else:
652                 # to verify using a detached sig we have to use
653                 # gpg.verify_file(), which receives the data as a binary
654                 # stream and the name of a file containing the signature.
655                 sf, sfname = tempfile.mkstemp()
656                 with os.fdopen(sf, 'w') as sfd:
657                     sfd.write(detached_sig)
658                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
659                 os.unlink(sfname)
660             gpgpubkey = gpg.list_keys().pop()
661             valid = result.valid
662             rfprint = result.fingerprint
663             kfprint = gpgpubkey['fingerprint']
664             # raise in case sig is invalid
665             if valid is False or rfprint != kfprint:
666                 raise errors.InvalidSignature(
667                     'Failed to verify signature '
668                     'with key %s.' % gpgpubkey['keyid'])
669             return True