Use type instead of tags to get docs in openpgp
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from datetime import datetime
29 from gnupg import GPG
30 from gnupg.gnupg import GPGUtilities
31
32 from leap.common.check import leap_assert, leap_assert_type, leap_check
33 from leap.keymanager import errors
34 from leap.keymanager.keys import (
35     EncryptionKey,
36     EncryptionScheme,
37     is_address,
38     build_key_from_dict,
39     TYPE_ADDRESS_PRIVATE_INDEX,
40     KEY_FINGERPRINT_KEY,
41     KEY_DATA_KEY,
42 )
43
44
45 logger = logging.getLogger(__name__)
46
47
48 #
49 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
50 #
51
52 class TempGPGWrapper(object):
53     """
54     A context manager that wraps a temporary GPG keyring which only contains
55     the keys given at object creation.
56     """
57
58     def __init__(self, keys=None, gpgbinary=None):
59         """
60         Create an empty temporary keyring and import any given C{keys} into
61         it.
62
63         :param keys: OpenPGP key, or list of.
64         :type keys: OpenPGPKey or list of OpenPGPKeys
65         :param gpgbinary: Name for GnuPG binary executable.
66         :type gpgbinary: C{str}
67         """
68         self._gpg = None
69         self._gpgbinary = gpgbinary
70         if not keys:
71             keys = list()
72         if not isinstance(keys, list):
73             keys = [keys]
74         self._keys = keys
75         for key in keys:
76             leap_assert_type(key, OpenPGPKey)
77
78     def __enter__(self):
79         """
80         Build and return a GPG keyring containing the keys given on
81         object creation.
82
83         :return: A GPG instance containing the keys given on object creation.
84         :rtype: gnupg.GPG
85         """
86         self._build_keyring()
87         return self._gpg
88
89     def __exit__(self, exc_type, exc_value, traceback):
90         """
91         Ensure the gpg is properly destroyed.
92         """
93         # TODO handle exceptions and log here
94         self._destroy_keyring()
95
96     def _build_keyring(self):
97         """
98         Create a GPG keyring containing the keys given on object creation.
99
100         :return: A GPG instance containing the keys given on object creation.
101         :rtype: gnupg.GPG
102         """
103         privkeys = [key for key in self._keys if key and key.private is True]
104         publkeys = [key for key in self._keys if key and key.private is False]
105         # here we filter out public keys that have a correspondent
106         # private key in the list because the private key_data by
107         # itself is enough to also have the public key in the keyring,
108         # and we want to count the keys afterwards.
109
110         privaddrs = map(lambda privkey: privkey.address[0], privkeys)
111         publkeys = filter(
112             lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
113
114         listkeys = lambda: self._gpg.list_keys()
115         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
116
117         self._gpg = GPG(binary=self._gpgbinary,
118                         homedir=tempfile.mkdtemp())
119         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
120
121         # import keys into the keyring:
122         # concatenating ascii-armored keys, which is correctly
123         # understood by GPG.
124
125         self._gpg.import_keys("".join(
126             [x.key_data for x in publkeys + privkeys]))
127
128         # assert the number of keys in the keyring
129         leap_assert(
130             len(listkeys()) == len(publkeys) + len(privkeys),
131             'Wrong number of public keys in keyring: %d, should be %d)' %
132             (len(listkeys()), len(publkeys) + len(privkeys)))
133         leap_assert(
134             len(listsecretkeys()) == len(privkeys),
135             'Wrong number of private keys in keyring: %d, should be %d)' %
136             (len(listsecretkeys()), len(privkeys)))
137
138     def _destroy_keyring(self):
139         """
140         Securely erase the keyring.
141         """
142         # TODO: implement some kind of wiping of data or a more
143         # secure way that
144         # does not write to disk.
145
146         try:
147             for secret in [True, False]:
148                 for key in self._gpg.list_keys(secret=secret):
149                     self._gpg.delete_keys(
150                         key['fingerprint'],
151                         secret=secret)
152             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
153
154         except:
155             raise
156
157         finally:
158             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
159                         "watch out! Tried to remove default gnupg home!")
160             shutil.rmtree(self._gpg.homedir)
161
162
163 def _build_key_from_gpg(address, key, key_data):
164     """
165     Build an OpenPGPKey for C{address} based on C{key} from
166     local gpg storage.
167
168     ASCII armored GPG key data has to be queried independently in this
169     wrapper, so we receive it in C{key_data}.
170
171     :param address: The address bound to the key.
172     :type address: str
173     :param key: Key obtained from GPG storage.
174     :type key: dict
175     :param key_data: Key data obtained from GPG storage.
176     :type key_data: str
177     :return: An instance of the key.
178     :rtype: OpenPGPKey
179     """
180     expiry_date = None
181     if key['expires']:
182         expiry_date = datetime.fromtimestamp(int(key['expires']))
183
184     return OpenPGPKey(
185         [address],
186         key_id=key['keyid'],
187         fingerprint=key['fingerprint'],
188         key_data=key_data,
189         private=True if key['type'] == 'sec' else False,
190         length=int(key['length']),
191         expiry_date=expiry_date,
192         refreshed_at=datetime.now(),
193     )
194
195
196 #
197 # The OpenPGP wrapper
198 #
199
200 class OpenPGPKey(EncryptionKey):
201     """
202     Base class for OpenPGP keys.
203     """
204
205
206 class OpenPGPScheme(EncryptionScheme):
207     """
208     A wrapper for OpenPGP keys management and use (encryption, decyption,
209     signing and verification).
210     """
211
212     # type used on the soledad documents
213     OPENPGP_KEY_TYPE = OpenPGPKey.__name__
214
215     def __init__(self, soledad, gpgbinary=None):
216         """
217         Initialize the OpenPGP wrapper.
218
219         :param soledad: A Soledad instance for key storage.
220         :type soledad: leap.soledad.Soledad
221         :param gpgbinary: Name for GnuPG binary executable.
222         :type gpgbinary: C{str}
223         """
224         EncryptionScheme.__init__(self, soledad)
225         self._gpgbinary = gpgbinary
226
227     #
228     # Keys management
229     #
230
231     def gen_key(self, address):
232         """
233         Generate an OpenPGP keypair bound to C{address}.
234
235         :param address: The address bound to the key.
236         :type address: str
237         :return: The key bound to C{address}.
238         :rtype: OpenPGPKey
239         @raise KeyAlreadyExists: If key already exists in local database.
240         """
241         # make sure the key does not already exist
242         leap_assert(is_address(address), 'Not an user address: %s' % address)
243         try:
244             self.get_key(address)
245             raise errors.KeyAlreadyExists(address)
246         except errors.KeyNotFound:
247             logger.debug('Key for %s not found' % (address,))
248
249         with self._temporary_gpgwrapper() as gpg:
250             # TODO: inspect result, or use decorator
251             params = gpg.gen_key_input(
252                 key_type='RSA',
253                 key_length=4096,
254                 name_real=address,
255                 name_email=address,
256                 name_comment='')
257             logger.info("About to generate keys... This might take SOME time.")
258             gpg.gen_key(params)
259             logger.info("Keys for %s have been successfully "
260                         "generated." % (address,))
261             pubkeys = gpg.list_keys()
262
263             # assert for new key characteristics
264
265             # XXX This exception is not properly catched by the soledad
266             # bootstrapping, so if we do not finish generating the keys
267             # we end with a blocked thread -- kali
268
269             leap_assert(
270                 len(pubkeys) is 1,  # a unitary keyring!
271                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
272             key = gpg.list_keys(secret=True).pop()
273             leap_assert(
274                 len(key['uids']) is 1,  # with just one uid!
275                 'Wrong number of uids for key: %d.' % len(key['uids']))
276             leap_assert(
277                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
278                 'Key not correctly bound to address.')
279             # insert both public and private keys in storage
280             for secret in [True, False]:
281                 key = gpg.list_keys(secret=secret).pop()
282                 openpgp_key = _build_key_from_gpg(
283                     address, key,
284                     gpg.export_keys(key['fingerprint'], secret=secret))
285                 self.put_key(openpgp_key)
286
287         return self.get_key(address, private=True)
288
289     def get_key(self, address, private=False):
290         """
291         Get key bound to C{address} from local storage.
292
293         :param address: The address bound to the key.
294         :type address: str
295         :param private: Look for a private key instead of a public one?
296         :type private: bool
297
298         :return: The key bound to C{address}.
299         :rtype: OpenPGPKey
300         @raise KeyNotFound: If the key was not found on local storage.
301         """
302         # Remove the identity suffix after the '+' until the '@'
303         # e.g.: test_user+something@provider.com becomes test_user@provider.com
304         # since the key belongs to the identity without the '+' suffix.
305         address = re.sub(r'\+.*\@', '@', address)
306
307         doc = self._get_key_doc(address, private)
308         if doc is None:
309             raise errors.KeyNotFound(address)
310         return build_key_from_dict(OpenPGPKey, address, doc.content)
311
312     def parse_ascii_key(self, key_data):
313         """
314         Parses an ascii armored key (or key pair) data and returns
315         the OpenPGPKey keys.
316
317         :param key_data: the key data to be parsed.
318         :type key_data: str or unicode
319
320         :returns: the public key and private key (if applies) for that data.
321         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
322                 the tuple may have one or both components None
323         """
324         leap_assert_type(key_data, (str, unicode))
325         # TODO: add more checks for correct key data.
326         leap_assert(key_data is not None, 'Data does not represent a key.')
327         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
328
329         with self._temporary_gpgwrapper() as gpg:
330             # TODO: inspect result, or use decorator
331             gpg.import_keys(key_data)
332             privkey = None
333             pubkey = None
334
335             try:
336                 privkey = gpg.list_keys(secret=True).pop()
337             except IndexError:
338                 pass
339             try:
340                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
341             except IndexError:
342                 return (None, None)
343
344             # extract adress from first uid on key
345             match = re.match(mail_regex, pubkey['uids'].pop())
346             leap_assert(match is not None, 'No user address in key data.')
347             address = match.group(1)
348
349             openpgp_privkey = None
350             if privkey is not None:
351                 match = re.match(mail_regex, privkey['uids'].pop())
352                 leap_assert(match is not None, 'No user address in key data.')
353                 privaddress = match.group(1)
354
355                 # build private key
356                 openpgp_privkey = _build_key_from_gpg(
357                     privaddress, privkey,
358                     gpg.export_keys(privkey['fingerprint'], secret=True))
359
360                 leap_check(address == privaddress,
361                            'Addresses in public and private key differ.',
362                            errors.KeyAddressMismatch)
363                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
364                            'Fingerprints for public and private key differ.',
365                            errors.KeyFingerprintMismatch)
366
367             # build public key
368             openpgp_pubkey = _build_key_from_gpg(
369                 address, pubkey,
370                 gpg.export_keys(pubkey['fingerprint'], secret=False))
371
372             return (openpgp_pubkey, openpgp_privkey)
373
374     def put_ascii_key(self, key_data):
375         """
376         Put key contained in ascii-armored C{key_data} in local storage.
377
378         :param key_data: The key data to be stored.
379         :type key_data: str or unicode
380         """
381         leap_assert_type(key_data, (str, unicode))
382
383         openpgp_privkey = None
384         try:
385             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
386         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
387             leap_assert(False, repr(e))
388
389         if openpgp_pubkey is not None:
390             self.put_key(openpgp_pubkey)
391         if openpgp_privkey is not None:
392             self.put_key(openpgp_privkey)
393
394     def put_key(self, key):
395         """
396         Put C{key} in local storage.
397
398         :param key: The key to be stored.
399         :type key: OpenPGPKey
400         """
401         doc = self._get_key_doc(key.address[0], private=key.private)
402         if doc is None:
403             self._soledad.create_doc_from_json(key.get_json())
404         else:
405             if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
406                 # in case of an update of the key merge them with gnupg
407                 with self._temporary_gpgwrapper() as gpg:
408                     gpg.import_keys(doc.content[KEY_DATA_KEY])
409                     gpg.import_keys(key.key_data)
410                     gpgkey = gpg.list_keys(secret=key.private).pop()
411                     key = _build_key_from_gpg(
412                         key.address[0], gpgkey,
413                         gpg.export_keys(gpgkey['fingerprint'],
414                                         secret=key.private))
415             doc.set_json(key.get_json())
416             self._soledad.put_doc(doc)
417
418     def _get_key_doc(self, address, private=False):
419         """
420         Get the document with a key (public, by default) bound to C{address}.
421
422         If C{private} is True, looks for a private key instead of a public.
423
424         :param address: The address bound to the key.
425         :type address: str
426         :param private: Whether to look for a private key.
427         :type private: bool
428         :return: The document with the key or None if it does not exist.
429         :rtype: leap.soledad.document.SoledadDocument
430         """
431         doclist = self._soledad.get_from_index(
432             TYPE_ADDRESS_PRIVATE_INDEX,
433             self.OPENPGP_KEY_TYPE,
434             address,
435             '1' if private else '0')
436         if len(doclist) is 0:
437             return None
438         leap_assert(
439             len(doclist) is 1,
440             'Found more than one %s key for address!' %
441             'private' if private else 'public')
442         return doclist.pop()
443
444     def delete_key(self, key):
445         """
446         Remove C{key} from storage.
447
448         May raise:
449             errors.KeyNotFound
450             errors.KeyAttributesDiffer
451
452         :param key: The key to be removed.
453         :type key: EncryptionKey
454         """
455         leap_assert_type(key, OpenPGPKey)
456         doc = self._get_key_doc(key.address[0], key.private)
457         if doc is None:
458             raise errors.KeyNotFound(key)
459         if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint:
460             raise errors.KeyAttributesDiffer(key)
461         self._soledad.delete_doc(doc)
462
463     #
464     # Data encryption, decryption, signing and verifying
465     #
466
467     def _temporary_gpgwrapper(self, keys=None):
468         """
469         Return a gpg wrapper that implements the context manager protocol and
470         contains C{keys}.
471
472         :param keys: keys to conform the keyring.
473         :type key: list(OpenPGPKey)
474
475         :return: a TempGPGWrapper instance
476         :rtype: TempGPGWrapper
477         """
478         # TODO do here checks on key_data
479         return TempGPGWrapper(
480             keys=keys, gpgbinary=self._gpgbinary)
481
482     @staticmethod
483     def _assert_gpg_result_ok(result):
484         """
485         Check if GPG result is 'ok' and log stderr outputs.
486
487         :param result: GPG results, which have a field calld 'ok' that states
488                        whether the gpg operation was successful or not.
489         :type result: object
490
491         :raise GPGError: Raised when the gpg operation was not successful.
492         """
493         stderr = getattr(result, 'stderr', None)
494         if stderr:
495             logger.debug("%s" % (stderr,))
496         if getattr(result, 'ok', None) is not True:
497             raise errors.GPGError(
498                 'Failed to encrypt/decrypt: %s' % stderr)
499
500     def encrypt(self, data, pubkey, passphrase=None, sign=None,
501                 cipher_algo='AES256'):
502         """
503         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
504
505         :param data: The data to be encrypted.
506         :type data: str
507         :param pubkey: The key used to encrypt.
508         :type pubkey: OpenPGPKey
509         :param sign: The key used for signing.
510         :type sign: OpenPGPKey
511         :param cipher_algo: The cipher algorithm to use.
512         :type cipher_algo: str
513
514         :return: The encrypted data.
515         :rtype: str
516
517         :raise EncryptError: Raised if failed encrypting for some reason.
518         """
519         leap_assert_type(pubkey, OpenPGPKey)
520         leap_assert(pubkey.private is False, 'Key is not public.')
521         keys = [pubkey]
522         if sign is not None:
523             leap_assert_type(sign, OpenPGPKey)
524             leap_assert(sign.private is True)
525             keys.append(sign)
526         with self._temporary_gpgwrapper(keys) as gpg:
527             result = gpg.encrypt(
528                 data, pubkey.fingerprint,
529                 default_key=sign.key_id if sign else None,
530                 passphrase=passphrase, symmetric=False,
531                 cipher_algo=cipher_algo)
532             # Here we cannot assert for correctness of sig because the sig is
533             # in the ciphertext.
534             # result.ok    - (bool) indicates if the operation succeeded
535             # result.data  - (bool) contains the result of the operation
536             try:
537                 self._assert_gpg_result_ok(result)
538                 return result.data
539             except errors.GPGError as e:
540                 logger.error('Failed to decrypt: %s.' % str(e))
541                 raise errors.EncryptError()
542
543     def decrypt(self, data, privkey, passphrase=None, verify=None):
544         """
545         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
546
547         :param data: The data to be decrypted.
548         :type data: str
549         :param privkey: The key used to decrypt.
550         :type privkey: OpenPGPKey
551         :param passphrase: The passphrase for the secret key used for
552                            decryption.
553         :type passphrase: str
554         :param verify: The key used to verify a signature.
555         :type verify: OpenPGPKey
556
557         :return: The decrypted data.
558         :rtype: unicode
559
560         :raise DecryptError: Raised if failed decrypting for some reason.
561         :raise InvalidSignature: Raised if unable to verify the signature with
562                                  C{verify} key.
563         """
564         leap_assert(privkey.private is True, 'Key is not private.')
565         keys = [privkey]
566         if verify is not None:
567             leap_assert_type(verify, OpenPGPKey)
568             leap_assert(verify.private is False)
569             keys.append(verify)
570         with self._temporary_gpgwrapper(keys) as gpg:
571             try:
572                 result = gpg.decrypt(
573                     data, passphrase=passphrase, always_trust=True)
574                 self._assert_gpg_result_ok(result)
575                 # verify signature
576                 if (verify is not None):
577                     if result.valid is False or \
578                             verify.fingerprint != result.pubkey_fingerprint:
579                         raise errors.InvalidSignature(
580                             'Failed to verify signature with key %s: %s' %
581                             (verify.key_id, result.stderr))
582
583                 return result.data
584             except errors.GPGError as e:
585                 logger.error('Failed to decrypt: %s.' % str(e))
586                 raise errors.DecryptError(str(e))
587
588     def is_encrypted(self, data):
589         """
590         Return whether C{data} was asymmetrically encrypted using OpenPGP.
591
592         :param data: The data we want to know about.
593         :type data: str
594
595         :return: Whether C{data} was encrypted using this wrapper.
596         :rtype: bool
597         """
598         with self._temporary_gpgwrapper() as gpg:
599             gpgutil = GPGUtilities(gpg)
600             return gpgutil.is_encrypted_asym(data)
601
602     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
603              detach=True, binary=False):
604         """
605         Sign C{data} with C{privkey}.
606
607         :param data: The data to be signed.
608         :type data: str
609
610         :param privkey: The private key to be used to sign.
611         :type privkey: OpenPGPKey
612         :param digest_algo: The hash digest to use.
613         :type digest_algo: str
614         :param clearsign: If True, create a cleartext signature.
615         :type clearsign: bool
616         :param detach: If True, create a detached signature.
617         :type detach: bool
618         :param binary: If True, do not ascii armour the output.
619         :type binary: bool
620
621         :return: The ascii-armored signed data.
622         :rtype: str
623         """
624         leap_assert_type(privkey, OpenPGPKey)
625         leap_assert(privkey.private is True)
626
627         # result.fingerprint - contains the fingerprint of the key used to
628         #                      sign.
629         with self._temporary_gpgwrapper(privkey) as gpg:
630             result = gpg.sign(data, default_key=privkey.key_id,
631                               digest_algo=digest_algo, clearsign=clearsign,
632                               detach=detach, binary=binary)
633             rfprint = privkey.fingerprint
634             privkey = gpg.list_keys(secret=True).pop()
635             kfprint = privkey['fingerprint']
636             if result.fingerprint is None:
637                 raise errors.SignFailed(
638                     'Failed to sign with key %s: %s' %
639                     (privkey['keyid'], result.stderr))
640             leap_assert(
641                 result.fingerprint == kfprint,
642                 'Signature and private key fingerprints mismatch: '
643                 '%s != %s' % (rfprint, kfprint))
644         return result.data
645
646     def verify(self, data, pubkey, detached_sig=None):
647         """
648         Verify signed C{data} with C{pubkey}, eventually using
649         C{detached_sig}.
650
651         :param data: The data to be verified.
652         :type data: str
653         :param pubkey: The public key to be used on verification.
654         :type pubkey: OpenPGPKey
655         :param detached_sig: A detached signature. If given, C{data} is
656                              verified against this detached signature.
657         :type detached_sig: str
658
659         :return: The ascii-armored signed data.
660         :rtype: str
661         """
662         leap_assert_type(pubkey, OpenPGPKey)
663         leap_assert(pubkey.private is False)
664         with self._temporary_gpgwrapper(pubkey) as gpg:
665             result = None
666             if detached_sig is None:
667                 result = gpg.verify(data)
668             else:
669                 # to verify using a detached sig we have to use
670                 # gpg.verify_file(), which receives the data as a binary
671                 # stream and the name of a file containing the signature.
672                 sf, sfname = tempfile.mkstemp()
673                 with os.fdopen(sf, 'w') as sfd:
674                     sfd.write(detached_sig)
675                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
676                 os.unlink(sfname)
677             gpgpubkey = gpg.list_keys().pop()
678             valid = result.valid
679             rfprint = result.fingerprint
680             kfprint = gpgpubkey['fingerprint']
681             # raise in case sig is invalid
682             if valid is False or rfprint != kfprint:
683                 raise errors.InvalidSignature(
684                     'Failed to verify signature '
685                     'with key %s.' % gpgpubkey['keyid'])
686             return True