Use datetime for key expiration
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from datetime import datetime
29 from gnupg import GPG
30 from gnupg.gnupg import GPGUtilities
31
32 from leap.common.check import leap_assert, leap_assert_type, leap_check
33 from leap.keymanager import errors
34 from leap.keymanager.keys import (
35     EncryptionKey,
36     EncryptionScheme,
37     is_address,
38     build_key_from_dict,
39     KEYMANAGER_KEY_TAG,
40     TAGS_ADDRESS_PRIVATE_INDEX,
41     KEY_FINGERPRINT_KEY,
42     KEY_DATA_KEY,
43 )
44 from leap.keymanager.validation import ValidationLevel
45
46
47 logger = logging.getLogger(__name__)
48
49
50 #
51 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
52 #
53
54 class TempGPGWrapper(object):
55     """
56     A context manager that wraps a temporary GPG keyring which only contains
57     the keys given at object creation.
58     """
59
60     def __init__(self, keys=None, gpgbinary=None):
61         """
62         Create an empty temporary keyring and import any given C{keys} into
63         it.
64
65         :param keys: OpenPGP key, or list of.
66         :type keys: OpenPGPKey or list of OpenPGPKeys
67         :param gpgbinary: Name for GnuPG binary executable.
68         :type gpgbinary: C{str}
69         """
70         self._gpg = None
71         self._gpgbinary = gpgbinary
72         if not keys:
73             keys = list()
74         if not isinstance(keys, list):
75             keys = [keys]
76         self._keys = keys
77         for key in keys:
78             leap_assert_type(key, OpenPGPKey)
79
80     def __enter__(self):
81         """
82         Build and return a GPG keyring containing the keys given on
83         object creation.
84
85         :return: A GPG instance containing the keys given on object creation.
86         :rtype: gnupg.GPG
87         """
88         self._build_keyring()
89         return self._gpg
90
91     def __exit__(self, exc_type, exc_value, traceback):
92         """
93         Ensure the gpg is properly destroyed.
94         """
95         # TODO handle exceptions and log here
96         self._destroy_keyring()
97
98     def _build_keyring(self):
99         """
100         Create a GPG keyring containing the keys given on object creation.
101
102         :return: A GPG instance containing the keys given on object creation.
103         :rtype: gnupg.GPG
104         """
105         privkeys = [key for key in self._keys if key and key.private is True]
106         publkeys = [key for key in self._keys if key and key.private is False]
107         # here we filter out public keys that have a correspondent
108         # private key in the list because the private key_data by
109         # itself is enough to also have the public key in the keyring,
110         # and we want to count the keys afterwards.
111
112         privaddrs = map(lambda privkey: privkey.address, privkeys)
113         publkeys = filter(
114             lambda pubkey: pubkey.address not in privaddrs, publkeys)
115
116         listkeys = lambda: self._gpg.list_keys()
117         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
118
119         self._gpg = GPG(binary=self._gpgbinary,
120                         homedir=tempfile.mkdtemp())
121         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
122
123         # import keys into the keyring:
124         # concatenating ascii-armored keys, which is correctly
125         # understood by GPG.
126
127         self._gpg.import_keys("".join(
128             [x.key_data for x in publkeys + privkeys]))
129
130         # assert the number of keys in the keyring
131         leap_assert(
132             len(listkeys()) == len(publkeys) + len(privkeys),
133             'Wrong number of public keys in keyring: %d, should be %d)' %
134             (len(listkeys()), len(publkeys) + len(privkeys)))
135         leap_assert(
136             len(listsecretkeys()) == len(privkeys),
137             'Wrong number of private keys in keyring: %d, should be %d)' %
138             (len(listsecretkeys()), len(privkeys)))
139
140     def _destroy_keyring(self):
141         """
142         Securely erase the keyring.
143         """
144         # TODO: implement some kind of wiping of data or a more
145         # secure way that
146         # does not write to disk.
147
148         try:
149             for secret in [True, False]:
150                 for key in self._gpg.list_keys(secret=secret):
151                     self._gpg.delete_keys(
152                         key['fingerprint'],
153                         secret=secret)
154             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
155
156         except:
157             raise
158
159         finally:
160             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
161                         "watch out! Tried to remove default gnupg home!")
162             shutil.rmtree(self._gpg.homedir)
163
164
165 def _build_key_from_gpg(address, key, key_data):
166     """
167     Build an OpenPGPKey for C{address} based on C{key} from
168     local gpg storage.
169
170     ASCII armored GPG key data has to be queried independently in this
171     wrapper, so we receive it in C{key_data}.
172
173     :param address: The address bound to the key.
174     :type address: str
175     :param key: Key obtained from GPG storage.
176     :type key: dict
177     :param key_data: Key data obtained from GPG storage.
178     :type key_data: str
179     :return: An instance of the key.
180     :rtype: OpenPGPKey
181     """
182     expiry_date = None
183     if key['expires']:
184         expiry_date = datetime.fromtimestamp(int(key['expires']))
185
186     return OpenPGPKey(
187         address,
188         key_id=key['keyid'],
189         fingerprint=key['fingerprint'],
190         key_data=key_data,
191         private=True if key['type'] == 'sec' else False,
192         length=key['length'],
193         expiry_date=expiry_date,
194         validation=ValidationLevel.Weak_Chain,
195     )
196
197
198 #
199 # The OpenPGP wrapper
200 #
201
202 class OpenPGPKey(EncryptionKey):
203     """
204     Base class for OpenPGP keys.
205     """
206
207
208 class OpenPGPScheme(EncryptionScheme):
209     """
210     A wrapper for OpenPGP keys management and use (encryption, decyption,
211     signing and verification).
212     """
213
214     def __init__(self, soledad, gpgbinary=None):
215         """
216         Initialize the OpenPGP wrapper.
217
218         :param soledad: A Soledad instance for key storage.
219         :type soledad: leap.soledad.Soledad
220         :param gpgbinary: Name for GnuPG binary executable.
221         :type gpgbinary: C{str}
222         """
223         EncryptionScheme.__init__(self, soledad)
224         self._gpgbinary = gpgbinary
225
226     #
227     # Keys management
228     #
229
230     def gen_key(self, address):
231         """
232         Generate an OpenPGP keypair bound to C{address}.
233
234         :param address: The address bound to the key.
235         :type address: str
236         :return: The key bound to C{address}.
237         :rtype: OpenPGPKey
238         @raise KeyAlreadyExists: If key already exists in local database.
239         """
240         # make sure the key does not already exist
241         leap_assert(is_address(address), 'Not an user address: %s' % address)
242         try:
243             self.get_key(address)
244             raise errors.KeyAlreadyExists(address)
245         except errors.KeyNotFound:
246             logger.debug('Key for %s not found' % (address,))
247
248         with self._temporary_gpgwrapper() as gpg:
249             # TODO: inspect result, or use decorator
250             params = gpg.gen_key_input(
251                 key_type='RSA',
252                 key_length=4096,
253                 name_real=address,
254                 name_email=address,
255                 name_comment='')
256             logger.info("About to generate keys... This might take SOME time.")
257             gpg.gen_key(params)
258             logger.info("Keys for %s have been successfully "
259                         "generated." % (address,))
260             pubkeys = gpg.list_keys()
261
262             # assert for new key characteristics
263
264             # XXX This exception is not properly catched by the soledad
265             # bootstrapping, so if we do not finish generating the keys
266             # we end with a blocked thread -- kali
267
268             leap_assert(
269                 len(pubkeys) is 1,  # a unitary keyring!
270                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
271             key = gpg.list_keys(secret=True).pop()
272             leap_assert(
273                 len(key['uids']) is 1,  # with just one uid!
274                 'Wrong number of uids for key: %d.' % len(key['uids']))
275             leap_assert(
276                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
277                 'Key not correctly bound to address.')
278             # insert both public and private keys in storage
279             for secret in [True, False]:
280                 key = gpg.list_keys(secret=secret).pop()
281                 openpgp_key = _build_key_from_gpg(
282                     address, key,
283                     gpg.export_keys(key['fingerprint'], secret=secret))
284                 self.put_key(openpgp_key)
285
286         return self.get_key(address, private=True)
287
288     def get_key(self, address, private=False):
289         """
290         Get key bound to C{address} from local storage.
291
292         :param address: The address bound to the key.
293         :type address: str
294         :param private: Look for a private key instead of a public one?
295         :type private: bool
296
297         :return: The key bound to C{address}.
298         :rtype: OpenPGPKey
299         @raise KeyNotFound: If the key was not found on local storage.
300         """
301         # Remove the identity suffix after the '+' until the '@'
302         # e.g.: test_user+something@provider.com becomes test_user@provider.com
303         # since the key belongs to the identity without the '+' suffix.
304         address = re.sub(r'\+.*\@', '@', address)
305
306         doc = self._get_key_doc(address, private)
307         if doc is None:
308             raise errors.KeyNotFound(address)
309         return build_key_from_dict(OpenPGPKey, address, doc.content)
310
311     def parse_ascii_key(self, key_data):
312         """
313         Parses an ascii armored key (or key pair) data and returns
314         the OpenPGPKey keys.
315
316         :param key_data: the key data to be parsed.
317         :type key_data: str or unicode
318
319         :returns: the public key and private key (if applies) for that data.
320         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
321                 the tuple may have one or both components None
322         """
323         leap_assert_type(key_data, (str, unicode))
324         # TODO: add more checks for correct key data.
325         leap_assert(key_data is not None, 'Data does not represent a key.')
326         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
327
328         with self._temporary_gpgwrapper() as gpg:
329             # TODO: inspect result, or use decorator
330             gpg.import_keys(key_data)
331             privkey = None
332             pubkey = None
333
334             try:
335                 privkey = gpg.list_keys(secret=True).pop()
336             except IndexError:
337                 pass
338             try:
339                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
340             except IndexError:
341                 return (None, None)
342
343             # extract adress from first uid on key
344             match = re.match(mail_regex, pubkey['uids'].pop())
345             leap_assert(match is not None, 'No user address in key data.')
346             address = match.group(1)
347
348             openpgp_privkey = None
349             if privkey is not None:
350                 match = re.match(mail_regex, privkey['uids'].pop())
351                 leap_assert(match is not None, 'No user address in key data.')
352                 privaddress = match.group(1)
353
354                 # build private key
355                 openpgp_privkey = _build_key_from_gpg(
356                     privaddress, privkey,
357                     gpg.export_keys(privkey['fingerprint'], secret=True))
358
359                 leap_check(address == privaddress,
360                            'Addresses in public and private key differ.',
361                            errors.KeyAddressMismatch)
362                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
363                            'Fingerprints for public and private key differ.',
364                            errors.KeyFingerprintMismatch)
365
366             # build public key
367             openpgp_pubkey = _build_key_from_gpg(
368                 address, pubkey,
369                 gpg.export_keys(pubkey['fingerprint'], secret=False))
370
371             return (openpgp_pubkey, openpgp_privkey)
372
373     def put_ascii_key(self, key_data):
374         """
375         Put key contained in ascii-armored C{key_data} in local storage.
376
377         :param key_data: The key data to be stored.
378         :type key_data: str or unicode
379         """
380         leap_assert_type(key_data, (str, unicode))
381
382         openpgp_privkey = None
383         try:
384             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
385         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
386             leap_assert(False, repr(e))
387
388         if openpgp_pubkey is not None:
389             self.put_key(openpgp_pubkey)
390         if openpgp_privkey is not None:
391             self.put_key(openpgp_privkey)
392
393     def put_key(self, key):
394         """
395         Put C{key} in local storage.
396
397         :param key: The key to be stored.
398         :type key: OpenPGPKey
399         """
400         doc = self._get_key_doc(key.address, private=key.private)
401         if doc is None:
402             self._soledad.create_doc_from_json(key.get_json())
403         else:
404             if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
405                 # in case of an update of the key merge them with gnupg
406                 with self._temporary_gpgwrapper() as gpg:
407                     gpg.import_keys(doc.content[KEY_DATA_KEY])
408                     gpg.import_keys(key.key_data)
409                     gpgkey = gpg.list_keys(secret=key.private).pop()
410                     key = _build_key_from_gpg(
411                         key.address, gpgkey,
412                         gpg.export_keys(gpgkey['fingerprint'],
413                                         secret=key.private))
414             doc.set_json(key.get_json())
415             self._soledad.put_doc(doc)
416
417     def _get_key_doc(self, address, private=False):
418         """
419         Get the document with a key (public, by default) bound to C{address}.
420
421         If C{private} is True, looks for a private key instead of a public.
422
423         :param address: The address bound to the key.
424         :type address: str
425         :param private: Whether to look for a private key.
426         :type private: bool
427         :return: The document with the key or None if it does not exist.
428         :rtype: leap.soledad.document.SoledadDocument
429         """
430         doclist = self._soledad.get_from_index(
431             TAGS_ADDRESS_PRIVATE_INDEX,
432             KEYMANAGER_KEY_TAG,
433             address,
434             '1' if private else '0')
435         if len(doclist) is 0:
436             return None
437         leap_assert(
438             len(doclist) is 1,
439             'Found more than one %s key for address!' %
440             'private' if private else 'public')
441         return doclist.pop()
442
443     def delete_key(self, key):
444         """
445         Remove C{key} from storage.
446
447         May raise:
448             errors.KeyNotFound
449             errors.KeyAttributesDiffer
450
451         :param key: The key to be removed.
452         :type key: EncryptionKey
453         """
454         leap_assert_type(key, OpenPGPKey)
455         stored_key = self.get_key(key.address, private=key.private)
456         if stored_key is None:
457             raise errors.KeyNotFound(key)
458         if stored_key.__dict__ != key.__dict__:
459             raise errors.KeyAttributesDiffer(key)
460         doc = self._get_key_doc(key.address, key.private)
461         self._soledad.delete_doc(doc)
462
463     #
464     # Data encryption, decryption, signing and verifying
465     #
466
467     def _temporary_gpgwrapper(self, keys=None):
468         """
469         Return a gpg wrapper that implements the context manager protocol and
470         contains C{keys}.
471
472         :param keys: keys to conform the keyring.
473         :type key: list(OpenPGPKey)
474
475         :return: a TempGPGWrapper instance
476         :rtype: TempGPGWrapper
477         """
478         # TODO do here checks on key_data
479         return TempGPGWrapper(
480             keys=keys, gpgbinary=self._gpgbinary)
481
482     @staticmethod
483     def _assert_gpg_result_ok(result):
484         """
485         Check if GPG result is 'ok' and log stderr outputs.
486
487         :param result: GPG results, which have a field calld 'ok' that states
488                        whether the gpg operation was successful or not.
489         :type result: object
490
491         :raise GPGError: Raised when the gpg operation was not successful.
492         """
493         stderr = getattr(result, 'stderr', None)
494         if stderr:
495             logger.debug("%s" % (stderr,))
496         if getattr(result, 'ok', None) is not True:
497             raise errors.GPGError(
498                 'Failed to encrypt/decrypt: %s' % stderr)
499
500     def encrypt(self, data, pubkey, passphrase=None, sign=None,
501                 cipher_algo='AES256'):
502         """
503         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
504
505         :param data: The data to be encrypted.
506         :type data: str
507         :param pubkey: The key used to encrypt.
508         :type pubkey: OpenPGPKey
509         :param sign: The key used for signing.
510         :type sign: OpenPGPKey
511         :param cipher_algo: The cipher algorithm to use.
512         :type cipher_algo: str
513
514         :return: The encrypted data.
515         :rtype: str
516
517         :raise EncryptError: Raised if failed encrypting for some reason.
518         """
519         leap_assert_type(pubkey, OpenPGPKey)
520         leap_assert(pubkey.private is False, 'Key is not public.')
521         keys = [pubkey]
522         if sign is not None:
523             leap_assert_type(sign, OpenPGPKey)
524             leap_assert(sign.private is True)
525             keys.append(sign)
526         with self._temporary_gpgwrapper(keys) as gpg:
527             result = gpg.encrypt(
528                 data, pubkey.fingerprint,
529                 default_key=sign.key_id if sign else None,
530                 passphrase=passphrase, symmetric=False,
531                 cipher_algo=cipher_algo)
532             # Here we cannot assert for correctness of sig because the sig is
533             # in the ciphertext.
534             # result.ok    - (bool) indicates if the operation succeeded
535             # result.data  - (bool) contains the result of the operation
536             try:
537                 self._assert_gpg_result_ok(result)
538                 return result.data
539             except errors.GPGError as e:
540                 logger.error('Failed to decrypt: %s.' % str(e))
541                 raise errors.EncryptError()
542
543     def decrypt(self, data, privkey, passphrase=None, verify=None):
544         """
545         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
546
547         :param data: The data to be decrypted.
548         :type data: str
549         :param privkey: The key used to decrypt.
550         :type privkey: OpenPGPKey
551         :param passphrase: The passphrase for the secret key used for
552                            decryption.
553         :type passphrase: str
554         :param verify: The key used to verify a signature.
555         :type verify: OpenPGPKey
556
557         :return: The decrypted data.
558         :rtype: unicode
559
560         :raise DecryptError: Raised if failed decrypting for some reason.
561         :raise InvalidSignature: Raised if unable to verify the signature with
562                                  C{verify} key.
563         """
564         leap_assert(privkey.private is True, 'Key is not private.')
565         keys = [privkey]
566         if verify is not None:
567             leap_assert_type(verify, OpenPGPKey)
568             leap_assert(verify.private is False)
569             keys.append(verify)
570         with self._temporary_gpgwrapper(keys) as gpg:
571             try:
572                 result = gpg.decrypt(
573                     data, passphrase=passphrase, always_trust=True)
574                 self._assert_gpg_result_ok(result)
575                 # verify signature
576                 if (verify is not None):
577                     if result.valid is False or \
578                             verify.fingerprint != result.pubkey_fingerprint:
579                         raise errors.InvalidSignature(
580                             'Failed to verify signature with key %s: %s' %
581                             (verify.key_id, result.stderr))
582
583                 return result.data
584             except errors.GPGError as e:
585                 logger.error('Failed to decrypt: %s.' % str(e))
586                 raise errors.DecryptError(str(e))
587
588     def is_encrypted(self, data):
589         """
590         Return whether C{data} was asymmetrically encrypted using OpenPGP.
591
592         :param data: The data we want to know about.
593         :type data: str
594
595         :return: Whether C{data} was encrypted using this wrapper.
596         :rtype: bool
597         """
598         with self._temporary_gpgwrapper() as gpg:
599             gpgutil = GPGUtilities(gpg)
600             return gpgutil.is_encrypted_asym(data)
601
602     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
603              detach=True, binary=False):
604         """
605         Sign C{data} with C{privkey}.
606
607         :param data: The data to be signed.
608         :type data: str
609
610         :param privkey: The private key to be used to sign.
611         :type privkey: OpenPGPKey
612         :param digest_algo: The hash digest to use.
613         :type digest_algo: str
614         :param clearsign: If True, create a cleartext signature.
615         :type clearsign: bool
616         :param detach: If True, create a detached signature.
617         :type detach: bool
618         :param binary: If True, do not ascii armour the output.
619         :type binary: bool
620
621         :return: The ascii-armored signed data.
622         :rtype: str
623         """
624         leap_assert_type(privkey, OpenPGPKey)
625         leap_assert(privkey.private is True)
626
627         # result.fingerprint - contains the fingerprint of the key used to
628         #                      sign.
629         with self._temporary_gpgwrapper(privkey) as gpg:
630             result = gpg.sign(data, default_key=privkey.key_id,
631                               digest_algo=digest_algo, clearsign=clearsign,
632                               detach=detach, binary=binary)
633             rfprint = privkey.fingerprint
634             privkey = gpg.list_keys(secret=True).pop()
635             kfprint = privkey['fingerprint']
636             if result.fingerprint is None:
637                 raise errors.SignFailed(
638                     'Failed to sign with key %s: %s' %
639                     (privkey['keyid'], result.stderr))
640             leap_assert(
641                 result.fingerprint == kfprint,
642                 'Signature and private key fingerprints mismatch: '
643                 '%s != %s' % (rfprint, kfprint))
644         return result.data
645
646     def verify(self, data, pubkey, detached_sig=None):
647         """
648         Verify signed C{data} with C{pubkey}, eventually using
649         C{detached_sig}.
650
651         :param data: The data to be verified.
652         :type data: str
653         :param pubkey: The public key to be used on verification.
654         :type pubkey: OpenPGPKey
655         :param detached_sig: A detached signature. If given, C{data} is
656                              verified against this detached signature.
657         :type detached_sig: str
658
659         :return: The ascii-armored signed data.
660         :rtype: str
661         """
662         leap_assert_type(pubkey, OpenPGPKey)
663         leap_assert(pubkey.private is False)
664         with self._temporary_gpgwrapper(pubkey) as gpg:
665             result = None
666             if detached_sig is None:
667                 result = gpg.verify(data)
668             else:
669                 # to verify using a detached sig we have to use
670                 # gpg.verify_file(), which receives the data as a binary
671                 # stream and the name of a file containing the signature.
672                 sf, sfname = tempfile.mkstemp()
673                 with os.fdopen(sf, 'w') as sfd:
674                     sfd.write(detached_sig)
675                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
676                 os.unlink(sfname)
677             gpgpubkey = gpg.list_keys().pop()
678             valid = result.valid
679             rfprint = result.fingerprint
680             kfprint = gpgpubkey['fingerprint']
681             # raise in case sig is invalid
682             if valid is False or rfprint != kfprint:
683                 raise errors.InvalidSignature(
684                     'Failed to verify signature '
685                     'with key %s.' % gpgpubkey['keyid'])
686             return True