gpg.verify_file() gets the data as a filename not as a binary stream
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25
26 from contextlib import closing
27
28 from gnupg import GPG
29 from gnupg.gnupg import GPGUtilities
30 from gnupg._util import _make_binary_stream
31
32 from leap.common.check import leap_assert, leap_assert_type, leap_check
33 from leap.keymanager import errors
34 from leap.keymanager.keys import (
35     EncryptionKey,
36     EncryptionScheme,
37     is_address,
38     build_key_from_dict,
39     KEYMANAGER_KEY_TAG,
40     TAGS_ADDRESS_PRIVATE_INDEX,
41 )
42
43
44 logger = logging.getLogger(__name__)
45
46
47 #
48 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
49 #
50
51 class TempGPGWrapper(object):
52     """
53     A context manager that wraps a temporary GPG keyring which only contains
54     the keys given at object creation.
55     """
56
57     def __init__(self, keys=None, gpgbinary=None):
58         """
59         Create an empty temporary keyring and import any given C{keys} into
60         it.
61
62         :param keys: OpenPGP key, or list of.
63         :type keys: OpenPGPKey or list of OpenPGPKeys
64         :param gpgbinary: Name for GnuPG binary executable.
65         :type gpgbinary: C{str}
66         """
67         self._gpg = None
68         self._gpgbinary = gpgbinary
69         if not keys:
70             keys = list()
71         if not isinstance(keys, list):
72             keys = [keys]
73         self._keys = keys
74         for key in keys:
75             leap_assert_type(key, OpenPGPKey)
76
77     def __enter__(self):
78         """
79         Build and return a GPG keyring containing the keys given on
80         object creation.
81
82         :return: A GPG instance containing the keys given on object creation.
83         :rtype: gnupg.GPG
84         """
85         self._build_keyring()
86         return self._gpg
87
88     def __exit__(self, exc_type, exc_value, traceback):
89         """
90         Ensure the gpg is properly destroyed.
91         """
92         # TODO handle exceptions and log here
93         self._destroy_keyring()
94
95     def _build_keyring(self):
96         """
97         Create a GPG keyring containing the keys given on object creation.
98
99         :return: A GPG instance containing the keys given on object creation.
100         :rtype: gnupg.GPG
101         """
102         privkeys = [key for key in self._keys if key and key.private is True]
103         publkeys = [key for key in self._keys if key and key.private is False]
104         # here we filter out public keys that have a correspondent
105         # private key in the list because the private key_data by
106         # itself is enough to also have the public key in the keyring,
107         # and we want to count the keys afterwards.
108
109         privaddrs = map(lambda privkey: privkey.address, privkeys)
110         publkeys = filter(
111             lambda pubkey: pubkey.address not in privaddrs, publkeys)
112
113         listkeys = lambda: self._gpg.list_keys()
114         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
115
116         self._gpg = GPG(binary=self._gpgbinary,
117                         homedir=tempfile.mkdtemp())
118         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
119
120         # import keys into the keyring:
121         # concatenating ascii-armored keys, which is correctly
122         # understood by GPG.
123
124         self._gpg.import_keys("".join(
125             [x.key_data for x in publkeys + privkeys]))
126
127         # assert the number of keys in the keyring
128         leap_assert(
129             len(listkeys()) == len(publkeys) + len(privkeys),
130             'Wrong number of public keys in keyring: %d, should be %d)' %
131             (len(listkeys()), len(publkeys) + len(privkeys)))
132         leap_assert(
133             len(listsecretkeys()) == len(privkeys),
134             'Wrong number of private keys in keyring: %d, should be %d)' %
135             (len(listsecretkeys()), len(privkeys)))
136
137     def _destroy_keyring(self):
138         """
139         Securely erase the keyring.
140         """
141         # TODO: implement some kind of wiping of data or a more
142         # secure way that
143         # does not write to disk.
144
145         try:
146             for secret in [True, False]:
147                 for key in self._gpg.list_keys(secret=secret):
148                     self._gpg.delete_keys(
149                         key['fingerprint'],
150                         secret=secret)
151             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
152
153         except:
154             raise
155
156         finally:
157             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
158                         "watch out! Tried to remove default gnupg home!")
159             shutil.rmtree(self._gpg.homedir)
160
161
162 def _build_key_from_gpg(address, key, key_data):
163     """
164     Build an OpenPGPKey for C{address} based on C{key} from
165     local gpg storage.
166
167     ASCII armored GPG key data has to be queried independently in this
168     wrapper, so we receive it in C{key_data}.
169
170     :param address: The address bound to the key.
171     :type address: str
172     :param key: Key obtained from GPG storage.
173     :type key: dict
174     :param key_data: Key data obtained from GPG storage.
175     :type key_data: str
176     :return: An instance of the key.
177     :rtype: OpenPGPKey
178     """
179     return OpenPGPKey(
180         address,
181         key_id=key['keyid'],
182         fingerprint=key['fingerprint'],
183         key_data=key_data,
184         private=True if key['type'] == 'sec' else False,
185         length=key['length'],
186         expiry_date=key['expires'],
187         validation=None,  # TODO: verify for validation.
188     )
189
190
191 #
192 # The OpenPGP wrapper
193 #
194
195 class OpenPGPKey(EncryptionKey):
196     """
197     Base class for OpenPGP keys.
198     """
199
200
201 class OpenPGPScheme(EncryptionScheme):
202     """
203     A wrapper for OpenPGP keys management and use (encryption, decyption,
204     signing and verification).
205     """
206
207     def __init__(self, soledad, gpgbinary=None):
208         """
209         Initialize the OpenPGP wrapper.
210
211         :param soledad: A Soledad instance for key storage.
212         :type soledad: leap.soledad.Soledad
213         :param gpgbinary: Name for GnuPG binary executable.
214         :type gpgbinary: C{str}
215         """
216         EncryptionScheme.__init__(self, soledad)
217         self._gpgbinary = gpgbinary
218
219     #
220     # Keys management
221     #
222
223     def gen_key(self, address):
224         """
225         Generate an OpenPGP keypair bound to C{address}.
226
227         :param address: The address bound to the key.
228         :type address: str
229         :return: The key bound to C{address}.
230         :rtype: OpenPGPKey
231         @raise KeyAlreadyExists: If key already exists in local database.
232         """
233         # make sure the key does not already exist
234         leap_assert(is_address(address), 'Not an user address: %s' % address)
235         try:
236             self.get_key(address)
237             raise errors.KeyAlreadyExists(address)
238         except errors.KeyNotFound:
239             logger.debug('Key for %s not found' % (address,))
240
241         with self._temporary_gpgwrapper() as gpg:
242             # TODO: inspect result, or use decorator
243             params = gpg.gen_key_input(
244                 key_type='RSA',
245                 key_length=4096,
246                 name_real=address,
247                 name_email=address,
248                 name_comment='')
249             logger.info("About to generate keys... This might take SOME time.")
250             gpg.gen_key(params)
251             logger.info("Keys for %s have been successfully "
252                         "generated." % (address,))
253             pubkeys = gpg.list_keys()
254
255             # assert for new key characteristics
256
257             # XXX This exception is not properly catched by the soledad
258             # bootstrapping, so if we do not finish generating the keys
259             # we end with a blocked thread -- kali
260
261             leap_assert(
262                 len(pubkeys) is 1,  # a unitary keyring!
263                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
264             key = gpg.list_keys(secret=True).pop()
265             leap_assert(
266                 len(key['uids']) is 1,  # with just one uid!
267                 'Wrong number of uids for key: %d.' % len(key['uids']))
268             leap_assert(
269                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
270                 'Key not correctly bound to address.')
271             # insert both public and private keys in storage
272             for secret in [True, False]:
273                 key = gpg.list_keys(secret=secret).pop()
274                 openpgp_key = _build_key_from_gpg(
275                     address, key,
276                     gpg.export_keys(key['fingerprint'], secret=secret))
277                 self.put_key(openpgp_key)
278
279         return self.get_key(address, private=True)
280
281     def get_key(self, address, private=False):
282         """
283         Get key bound to C{address} from local storage.
284
285         :param address: The address bound to the key.
286         :type address: str
287         :param private: Look for a private key instead of a public one?
288         :type private: bool
289
290         :return: The key bound to C{address}.
291         :rtype: OpenPGPKey
292         @raise KeyNotFound: If the key was not found on local storage.
293         """
294         # Remove the identity suffix after the '+' until the '@'
295         # e.g.: test_user+something@provider.com becomes test_user@provider.com
296         # since the key belongs to the identity without the '+' suffix.
297         address = re.sub(r'\+.*\@', '@', address)
298
299         doc = self._get_key_doc(address, private)
300         if doc is None:
301             raise errors.KeyNotFound(address)
302         return build_key_from_dict(OpenPGPKey, address, doc.content)
303
304     def parse_ascii_key(self, key_data):
305         """
306         Parses an ascii armored key (or key pair) data and returns
307         the OpenPGPKey keys.
308
309         :param key_data: the key data to be parsed.
310         :type key_data: str or unicode
311
312         :returns: the public key and private key (if applies) for that data.
313         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
314                 the tuple may have one or both components None
315         """
316         leap_assert_type(key_data, (str, unicode))
317         # TODO: add more checks for correct key data.
318         leap_assert(key_data is not None, 'Data does not represent a key.')
319         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
320
321         with self._temporary_gpgwrapper() as gpg:
322             # TODO: inspect result, or use decorator
323             gpg.import_keys(key_data)
324             privkey = None
325             pubkey = None
326
327             try:
328                 privkey = gpg.list_keys(secret=True).pop()
329             except IndexError:
330                 pass
331             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
332
333             # extract adress from first uid on key
334             match = re.match(mail_regex, pubkey['uids'].pop())
335             leap_assert(match is not None, 'No user address in key data.')
336             address = match.group(1)
337
338             openpgp_privkey = None
339             if privkey is not None:
340                 match = re.match(mail_regex, privkey['uids'].pop())
341                 leap_assert(match is not None, 'No user address in key data.')
342                 privaddress = match.group(1)
343
344                 # build private key
345                 openpgp_privkey = _build_key_from_gpg(
346                     privaddress, privkey,
347                     gpg.export_keys(privkey['fingerprint'], secret=True))
348
349                 leap_check(address == privaddress,
350                            'Addresses in public and private key differ.',
351                            errors.KeyAddressMismatch)
352                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
353                            'Fingerprints for public and private key differ.',
354                            errors.KeyFingerprintMismatch)
355
356             # build public key
357             openpgp_pubkey = _build_key_from_gpg(
358                 address, pubkey,
359                 gpg.export_keys(pubkey['fingerprint'], secret=False))
360
361             return (openpgp_pubkey, openpgp_privkey)
362
363     def put_ascii_key(self, key_data):
364         """
365         Put key contained in ascii-armored C{key_data} in local storage.
366
367         :param key_data: The key data to be stored.
368         :type key_data: str or unicode
369         """
370         leap_assert_type(key_data, (str, unicode))
371
372         openpgp_privkey = None
373         try:
374             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
375         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
376             leap_assert(False, repr(e))
377
378         if openpgp_pubkey is not None:
379             self.put_key(openpgp_pubkey)
380         if openpgp_privkey is not None:
381             self.put_key(openpgp_privkey)
382
383     def put_key(self, key):
384         """
385         Put C{key} in local storage.
386
387         :param key: The key to be stored.
388         :type key: OpenPGPKey
389         """
390         doc = self._get_key_doc(key.address, private=key.private)
391         if doc is None:
392             self._soledad.create_doc_from_json(key.get_json())
393         else:
394             doc.set_json(key.get_json())
395             self._soledad.put_doc(doc)
396
397     def _get_key_doc(self, address, private=False):
398         """
399         Get the document with a key (public, by default) bound to C{address}.
400
401         If C{private} is True, looks for a private key instead of a public.
402
403         :param address: The address bound to the key.
404         :type address: str
405         :param private: Whether to look for a private key.
406         :type private: bool
407         :return: The document with the key or None if it does not exist.
408         :rtype: leap.soledad.document.SoledadDocument
409         """
410         doclist = self._soledad.get_from_index(
411             TAGS_ADDRESS_PRIVATE_INDEX,
412             KEYMANAGER_KEY_TAG,
413             address,
414             '1' if private else '0')
415         if len(doclist) is 0:
416             return None
417         leap_assert(
418             len(doclist) is 1,
419             'Found more than one %s key for address!' %
420             'private' if private else 'public')
421         return doclist.pop()
422
423     def delete_key(self, key):
424         """
425         Remove C{key} from storage.
426
427         May raise:
428             errors.KeyNotFound
429             errors.KeyAttributesDiffer
430
431         :param key: The key to be removed.
432         :type key: EncryptionKey
433         """
434         leap_assert_type(key, OpenPGPKey)
435         stored_key = self.get_key(key.address, private=key.private)
436         if stored_key is None:
437             raise errors.KeyNotFound(key)
438         if stored_key.__dict__ != key.__dict__:
439             raise errors.KeyAttributesDiffer(key)
440         doc = self._get_key_doc(key.address, key.private)
441         self._soledad.delete_doc(doc)
442
443     #
444     # Data encryption, decryption, signing and verifying
445     #
446
447     def _temporary_gpgwrapper(self, keys=None):
448         """
449         Return a gpg wrapper that implements the context manager protocol and
450         contains C{keys}.
451
452         :param key_data: ASCII armored key data.
453         :type key_data: str
454         :param gpgbinary: Name for GnuPG binary executable.
455         :type gpgbinary: C{str}
456
457         :return: a TempGPGWrapper instance
458         :rtype: TempGPGWrapper
459         """
460         # TODO do here checks on key_data
461         return TempGPGWrapper(
462             keys=keys, gpgbinary=self._gpgbinary)
463
464     @staticmethod
465     def _assert_gpg_result_ok(result):
466         """
467         Check if GPG result is 'ok' and log stderr outputs.
468
469         :param result: GPG results, which have a field calld 'ok' that states
470                        whether the gpg operation was successful or not.
471         :type result: object
472
473         :raise GPGError: Raised when the gpg operation was not successful.
474         """
475         stderr = getattr(result, 'stderr', None)
476         if stderr:
477             logger.debug("%s" % (stderr,))
478         if getattr(result, 'ok', None) is not True:
479             raise errors.GPGError(
480                 'Failed to encrypt/decrypt: %s' % stderr)
481
482     def encrypt(self, data, pubkey, passphrase=None, sign=None,
483                 cipher_algo='AES256'):
484         """
485         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
486
487         :param data: The data to be encrypted.
488         :type data: str
489         :param pubkey: The key used to encrypt.
490         :type pubkey: OpenPGPKey
491         :param sign: The key used for signing.
492         :type sign: OpenPGPKey
493         :param cipher_algo: The cipher algorithm to use.
494         :type cipher_algo: str
495
496         :return: The encrypted data.
497         :rtype: str
498
499         :raise EncryptError: Raised if failed encrypting for some reason.
500         """
501         leap_assert_type(pubkey, OpenPGPKey)
502         leap_assert(pubkey.private is False, 'Key is not public.')
503         keys = [pubkey]
504         if sign is not None:
505             leap_assert_type(sign, OpenPGPKey)
506             leap_assert(sign.private is True)
507             keys.append(sign)
508         with self._temporary_gpgwrapper(keys) as gpg:
509             result = gpg.encrypt(
510                 data, pubkey.fingerprint,
511                 default_key=sign.key_id if sign else None,
512                 passphrase=passphrase, symmetric=False,
513                 cipher_algo=cipher_algo)
514             # Here we cannot assert for correctness of sig because the sig is
515             # in the ciphertext.
516             # result.ok    - (bool) indicates if the operation succeeded
517             # result.data  - (bool) contains the result of the operation
518             try:
519                 self._assert_gpg_result_ok(result)
520                 return result.data
521             except errors.GPGError as e:
522                 logger.error('Failed to decrypt: %s.' % str(e))
523                 raise errors.EncryptError()
524
525     def decrypt(self, data, privkey, passphrase=None, verify=None):
526         """
527         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
528
529         :param data: The data to be decrypted.
530         :type data: str
531         :param privkey: The key used to decrypt.
532         :type privkey: OpenPGPKey
533         :param passphrase: The passphrase for the secret key used for
534                            decryption.
535         :type passphrase: str
536         :param verify: The key used to verify a signature.
537         :type verify: OpenPGPKey
538
539         :return: The decrypted data.
540         :rtype: unicode
541
542         :raise DecryptError: Raised if failed decrypting for some reason.
543         :raise InvalidSignature: Raised if unable to verify the signature with
544                                  C{verify} key.
545         """
546         leap_assert(privkey.private is True, 'Key is not private.')
547         keys = [privkey]
548         if verify is not None:
549             leap_assert_type(verify, OpenPGPKey)
550             leap_assert(verify.private is False)
551             keys.append(verify)
552         with self._temporary_gpgwrapper(keys) as gpg:
553             try:
554                 result = gpg.decrypt(
555                     data, passphrase=passphrase, always_trust=True)
556                 self._assert_gpg_result_ok(result)
557                 # verify signature
558                 if (verify is not None):
559                     if result.valid is False or \
560                             verify.fingerprint != result.pubkey_fingerprint:
561                         raise errors.InvalidSignature(
562                             'Failed to verify signature with key %s: %s' %
563                             (verify.key_id, result.stderr))
564
565                 return result.data
566             except errors.GPGError as e:
567                 logger.error('Failed to decrypt: %s.' % str(e))
568                 raise errors.DecryptError(str(e))
569
570     def is_encrypted(self, data):
571         """
572         Return whether C{data} was asymmetrically encrypted using OpenPGP.
573
574         :param data: The data we want to know about.
575         :type data: str
576
577         :return: Whether C{data} was encrypted using this wrapper.
578         :rtype: bool
579         """
580         with self._temporary_gpgwrapper() as gpg:
581             gpgutil = GPGUtilities(gpg)
582             return gpgutil.is_encrypted_asym(data)
583
584     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
585              detach=True, binary=False):
586         """
587         Sign C{data} with C{privkey}.
588
589         :param data: The data to be signed.
590         :type data: str
591
592         :param privkey: The private key to be used to sign.
593         :type privkey: OpenPGPKey
594         :param digest_algo: The hash digest to use.
595         :type digest_algo: str
596         :param clearsign: If True, create a cleartext signature.
597         :type clearsign: bool
598         :param detach: If True, create a detached signature.
599         :type detach: bool
600         :param binary: If True, do not ascii armour the output.
601         :type binary: bool
602
603         :return: The ascii-armored signed data.
604         :rtype: str
605         """
606         leap_assert_type(privkey, OpenPGPKey)
607         leap_assert(privkey.private is True)
608
609         # result.fingerprint - contains the fingerprint of the key used to
610         #                      sign.
611         with self._temporary_gpgwrapper(privkey) as gpg:
612             result = gpg.sign(data, default_key=privkey.key_id,
613                               digest_algo=digest_algo, clearsign=clearsign,
614                               detach=detach, binary=binary)
615             rfprint = privkey.fingerprint
616             privkey = gpg.list_keys(secret=True).pop()
617             kfprint = privkey['fingerprint']
618             if result.fingerprint is None:
619                 raise errors.SignFailed(
620                     'Failed to sign with key %s: %s' %
621                     (privkey['keyid'], result.stderr))
622             leap_assert(
623                 result.fingerprint == kfprint,
624                 'Signature and private key fingerprints mismatch: '
625                 '%s != %s' % (rfprint, kfprint))
626         return result.data
627
628     def verify(self, data, pubkey, detached_sig=None):
629         """
630         Verify signed C{data} with C{pubkey}, eventually using
631         C{detached_sig}.
632
633         :param data: The data to be verified.
634         :type data: str
635         :param pubkey: The public key to be used on verification.
636         :type pubkey: OpenPGPKey
637         :param detached_sig: A detached signature. If given, C{data} is
638                              verified against this detached signature.
639         :type detached_sig: str
640
641         :return: The ascii-armored signed data.
642         :rtype: str
643         """
644         leap_assert_type(pubkey, OpenPGPKey)
645         leap_assert(pubkey.private is False)
646         with self._temporary_gpgwrapper(pubkey) as gpg:
647             result = None
648             if detached_sig is None:
649                 result = gpg.verify(data)
650             else:
651                 # to verify using a detached sig we have to use
652                 # gpg.verify_file(), which receives the name of
653                 # files containing the date and the signature.
654                 sf, sfname = tempfile.mkstemp()
655                 with os.fdopen(sf, 'w') as sfd:
656                     sfd.write(detached_sig)
657                 df, dfname = tempfile.mkstemp()
658                 with os.fdopen(df, 'w') as sdd:
659                     sdd.write(data)
660                 result = gpg.verify_file(dfname, sig_file=sfname)
661                 os.unlink(sfname)
662                 os.unlink(dfname)
663             gpgpubkey = gpg.list_keys().pop()
664             valid = result.valid
665             rfprint = result.fingerprint
666             kfprint = gpgpubkey['fingerprint']
667             # raise in case sig is invalid
668             if valid is False or rfprint != kfprint:
669                 raise errors.InvalidSignature(
670                     'Failed to verify signature '
671                     'with key %s.' % gpgpubkey['keyid'])
672             return True