Merge keys when updating an exisiting key
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from gnupg import GPG
29 from gnupg.gnupg import GPGUtilities
30
31 from leap.common.check import leap_assert, leap_assert_type, leap_check
32 from leap.keymanager import errors
33 from leap.keymanager.keys import (
34     EncryptionKey,
35     EncryptionScheme,
36     is_address,
37     build_key_from_dict,
38     KEYMANAGER_KEY_TAG,
39     TAGS_ADDRESS_PRIVATE_INDEX,
40     KEY_FINGERPRINT_KEY,
41     KEY_DATA_KEY,
42 )
43 from leap.keymanager.validation import ValidationLevel
44
45
46 logger = logging.getLogger(__name__)
47
48
49 #
50 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
51 #
52
53 class TempGPGWrapper(object):
54     """
55     A context manager that wraps a temporary GPG keyring which only contains
56     the keys given at object creation.
57     """
58
59     def __init__(self, keys=None, gpgbinary=None):
60         """
61         Create an empty temporary keyring and import any given C{keys} into
62         it.
63
64         :param keys: OpenPGP key, or list of.
65         :type keys: OpenPGPKey or list of OpenPGPKeys
66         :param gpgbinary: Name for GnuPG binary executable.
67         :type gpgbinary: C{str}
68         """
69         self._gpg = None
70         self._gpgbinary = gpgbinary
71         if not keys:
72             keys = list()
73         if not isinstance(keys, list):
74             keys = [keys]
75         self._keys = keys
76         for key in keys:
77             leap_assert_type(key, OpenPGPKey)
78
79     def __enter__(self):
80         """
81         Build and return a GPG keyring containing the keys given on
82         object creation.
83
84         :return: A GPG instance containing the keys given on object creation.
85         :rtype: gnupg.GPG
86         """
87         self._build_keyring()
88         return self._gpg
89
90     def __exit__(self, exc_type, exc_value, traceback):
91         """
92         Ensure the gpg is properly destroyed.
93         """
94         # TODO handle exceptions and log here
95         self._destroy_keyring()
96
97     def _build_keyring(self):
98         """
99         Create a GPG keyring containing the keys given on object creation.
100
101         :return: A GPG instance containing the keys given on object creation.
102         :rtype: gnupg.GPG
103         """
104         privkeys = [key for key in self._keys if key and key.private is True]
105         publkeys = [key for key in self._keys if key and key.private is False]
106         # here we filter out public keys that have a correspondent
107         # private key in the list because the private key_data by
108         # itself is enough to also have the public key in the keyring,
109         # and we want to count the keys afterwards.
110
111         privaddrs = map(lambda privkey: privkey.address, privkeys)
112         publkeys = filter(
113             lambda pubkey: pubkey.address not in privaddrs, publkeys)
114
115         listkeys = lambda: self._gpg.list_keys()
116         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
117
118         self._gpg = GPG(binary=self._gpgbinary,
119                         homedir=tempfile.mkdtemp())
120         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
121
122         # import keys into the keyring:
123         # concatenating ascii-armored keys, which is correctly
124         # understood by GPG.
125
126         self._gpg.import_keys("".join(
127             [x.key_data for x in publkeys + privkeys]))
128
129         # assert the number of keys in the keyring
130         leap_assert(
131             len(listkeys()) == len(publkeys) + len(privkeys),
132             'Wrong number of public keys in keyring: %d, should be %d)' %
133             (len(listkeys()), len(publkeys) + len(privkeys)))
134         leap_assert(
135             len(listsecretkeys()) == len(privkeys),
136             'Wrong number of private keys in keyring: %d, should be %d)' %
137             (len(listsecretkeys()), len(privkeys)))
138
139     def _destroy_keyring(self):
140         """
141         Securely erase the keyring.
142         """
143         # TODO: implement some kind of wiping of data or a more
144         # secure way that
145         # does not write to disk.
146
147         try:
148             for secret in [True, False]:
149                 for key in self._gpg.list_keys(secret=secret):
150                     self._gpg.delete_keys(
151                         key['fingerprint'],
152                         secret=secret)
153             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
154
155         except:
156             raise
157
158         finally:
159             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
160                         "watch out! Tried to remove default gnupg home!")
161             shutil.rmtree(self._gpg.homedir)
162
163
164 def _build_key_from_gpg(address, key, key_data):
165     """
166     Build an OpenPGPKey for C{address} based on C{key} from
167     local gpg storage.
168
169     ASCII armored GPG key data has to be queried independently in this
170     wrapper, so we receive it in C{key_data}.
171
172     :param address: The address bound to the key.
173     :type address: str
174     :param key: Key obtained from GPG storage.
175     :type key: dict
176     :param key_data: Key data obtained from GPG storage.
177     :type key_data: str
178     :return: An instance of the key.
179     :rtype: OpenPGPKey
180     """
181     return OpenPGPKey(
182         address,
183         key_id=key['keyid'],
184         fingerprint=key['fingerprint'],
185         key_data=key_data,
186         private=True if key['type'] == 'sec' else False,
187         length=key['length'],
188         expiry_date=key['expires'],
189         validation=ValidationLevel.Weak_Chain,
190     )
191
192
193 #
194 # The OpenPGP wrapper
195 #
196
197 class OpenPGPKey(EncryptionKey):
198     """
199     Base class for OpenPGP keys.
200     """
201
202
203 class OpenPGPScheme(EncryptionScheme):
204     """
205     A wrapper for OpenPGP keys management and use (encryption, decyption,
206     signing and verification).
207     """
208
209     def __init__(self, soledad, gpgbinary=None):
210         """
211         Initialize the OpenPGP wrapper.
212
213         :param soledad: A Soledad instance for key storage.
214         :type soledad: leap.soledad.Soledad
215         :param gpgbinary: Name for GnuPG binary executable.
216         :type gpgbinary: C{str}
217         """
218         EncryptionScheme.__init__(self, soledad)
219         self._gpgbinary = gpgbinary
220
221     #
222     # Keys management
223     #
224
225     def gen_key(self, address):
226         """
227         Generate an OpenPGP keypair bound to C{address}.
228
229         :param address: The address bound to the key.
230         :type address: str
231         :return: The key bound to C{address}.
232         :rtype: OpenPGPKey
233         @raise KeyAlreadyExists: If key already exists in local database.
234         """
235         # make sure the key does not already exist
236         leap_assert(is_address(address), 'Not an user address: %s' % address)
237         try:
238             self.get_key(address)
239             raise errors.KeyAlreadyExists(address)
240         except errors.KeyNotFound:
241             logger.debug('Key for %s not found' % (address,))
242
243         with self._temporary_gpgwrapper() as gpg:
244             # TODO: inspect result, or use decorator
245             params = gpg.gen_key_input(
246                 key_type='RSA',
247                 key_length=4096,
248                 name_real=address,
249                 name_email=address,
250                 name_comment='')
251             logger.info("About to generate keys... This might take SOME time.")
252             gpg.gen_key(params)
253             logger.info("Keys for %s have been successfully "
254                         "generated." % (address,))
255             pubkeys = gpg.list_keys()
256
257             # assert for new key characteristics
258
259             # XXX This exception is not properly catched by the soledad
260             # bootstrapping, so if we do not finish generating the keys
261             # we end with a blocked thread -- kali
262
263             leap_assert(
264                 len(pubkeys) is 1,  # a unitary keyring!
265                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
266             key = gpg.list_keys(secret=True).pop()
267             leap_assert(
268                 len(key['uids']) is 1,  # with just one uid!
269                 'Wrong number of uids for key: %d.' % len(key['uids']))
270             leap_assert(
271                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
272                 'Key not correctly bound to address.')
273             # insert both public and private keys in storage
274             for secret in [True, False]:
275                 key = gpg.list_keys(secret=secret).pop()
276                 openpgp_key = _build_key_from_gpg(
277                     address, key,
278                     gpg.export_keys(key['fingerprint'], secret=secret))
279                 self.put_key(openpgp_key)
280
281         return self.get_key(address, private=True)
282
283     def get_key(self, address, private=False):
284         """
285         Get key bound to C{address} from local storage.
286
287         :param address: The address bound to the key.
288         :type address: str
289         :param private: Look for a private key instead of a public one?
290         :type private: bool
291
292         :return: The key bound to C{address}.
293         :rtype: OpenPGPKey
294         @raise KeyNotFound: If the key was not found on local storage.
295         """
296         # Remove the identity suffix after the '+' until the '@'
297         # e.g.: test_user+something@provider.com becomes test_user@provider.com
298         # since the key belongs to the identity without the '+' suffix.
299         address = re.sub(r'\+.*\@', '@', address)
300
301         doc = self._get_key_doc(address, private)
302         if doc is None:
303             raise errors.KeyNotFound(address)
304         return build_key_from_dict(OpenPGPKey, address, doc.content)
305
306     def parse_ascii_key(self, key_data):
307         """
308         Parses an ascii armored key (or key pair) data and returns
309         the OpenPGPKey keys.
310
311         :param key_data: the key data to be parsed.
312         :type key_data: str or unicode
313
314         :returns: the public key and private key (if applies) for that data.
315         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
316                 the tuple may have one or both components None
317         """
318         leap_assert_type(key_data, (str, unicode))
319         # TODO: add more checks for correct key data.
320         leap_assert(key_data is not None, 'Data does not represent a key.')
321         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
322
323         with self._temporary_gpgwrapper() as gpg:
324             # TODO: inspect result, or use decorator
325             gpg.import_keys(key_data)
326             privkey = None
327             pubkey = None
328
329             try:
330                 privkey = gpg.list_keys(secret=True).pop()
331             except IndexError:
332                 pass
333             try:
334                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
335             except IndexError:
336                 return (None, None)
337
338             # extract adress from first uid on key
339             match = re.match(mail_regex, pubkey['uids'].pop())
340             leap_assert(match is not None, 'No user address in key data.')
341             address = match.group(1)
342
343             openpgp_privkey = None
344             if privkey is not None:
345                 match = re.match(mail_regex, privkey['uids'].pop())
346                 leap_assert(match is not None, 'No user address in key data.')
347                 privaddress = match.group(1)
348
349                 # build private key
350                 openpgp_privkey = _build_key_from_gpg(
351                     privaddress, privkey,
352                     gpg.export_keys(privkey['fingerprint'], secret=True))
353
354                 leap_check(address == privaddress,
355                            'Addresses in public and private key differ.',
356                            errors.KeyAddressMismatch)
357                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
358                            'Fingerprints for public and private key differ.',
359                            errors.KeyFingerprintMismatch)
360
361             # build public key
362             openpgp_pubkey = _build_key_from_gpg(
363                 address, pubkey,
364                 gpg.export_keys(pubkey['fingerprint'], secret=False))
365
366             return (openpgp_pubkey, openpgp_privkey)
367
368     def put_ascii_key(self, key_data):
369         """
370         Put key contained in ascii-armored C{key_data} in local storage.
371
372         :param key_data: The key data to be stored.
373         :type key_data: str or unicode
374         """
375         leap_assert_type(key_data, (str, unicode))
376
377         openpgp_privkey = None
378         try:
379             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
380         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
381             leap_assert(False, repr(e))
382
383         if openpgp_pubkey is not None:
384             self.put_key(openpgp_pubkey)
385         if openpgp_privkey is not None:
386             self.put_key(openpgp_privkey)
387
388     def put_key(self, key):
389         """
390         Put C{key} in local storage.
391
392         :param key: The key to be stored.
393         :type key: OpenPGPKey
394         """
395         doc = self._get_key_doc(key.address, private=key.private)
396         if doc is None:
397             self._soledad.create_doc_from_json(key.get_json())
398         else:
399             if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
400                 # in case of an update of the key merge them with gnupg
401                 with self._temporary_gpgwrapper() as gpg:
402                     gpg.import_keys(doc.content[KEY_DATA_KEY])
403                     gpg.import_keys(key.key_data)
404                     gpgkey = gpg.list_keys(secret=key.private).pop()
405                     key = _build_key_from_gpg(
406                         key.address, gpgkey,
407                         gpg.export_keys(gpgkey['fingerprint'],
408                                         secret=key.private))
409             doc.set_json(key.get_json())
410             self._soledad.put_doc(doc)
411
412     def _get_key_doc(self, address, private=False):
413         """
414         Get the document with a key (public, by default) bound to C{address}.
415
416         If C{private} is True, looks for a private key instead of a public.
417
418         :param address: The address bound to the key.
419         :type address: str
420         :param private: Whether to look for a private key.
421         :type private: bool
422         :return: The document with the key or None if it does not exist.
423         :rtype: leap.soledad.document.SoledadDocument
424         """
425         doclist = self._soledad.get_from_index(
426             TAGS_ADDRESS_PRIVATE_INDEX,
427             KEYMANAGER_KEY_TAG,
428             address,
429             '1' if private else '0')
430         if len(doclist) is 0:
431             return None
432         leap_assert(
433             len(doclist) is 1,
434             'Found more than one %s key for address!' %
435             'private' if private else 'public')
436         return doclist.pop()
437
438     def delete_key(self, key):
439         """
440         Remove C{key} from storage.
441
442         May raise:
443             errors.KeyNotFound
444             errors.KeyAttributesDiffer
445
446         :param key: The key to be removed.
447         :type key: EncryptionKey
448         """
449         leap_assert_type(key, OpenPGPKey)
450         stored_key = self.get_key(key.address, private=key.private)
451         if stored_key is None:
452             raise errors.KeyNotFound(key)
453         if stored_key.__dict__ != key.__dict__:
454             raise errors.KeyAttributesDiffer(key)
455         doc = self._get_key_doc(key.address, key.private)
456         self._soledad.delete_doc(doc)
457
458     #
459     # Data encryption, decryption, signing and verifying
460     #
461
462     def _temporary_gpgwrapper(self, keys=None):
463         """
464         Return a gpg wrapper that implements the context manager protocol and
465         contains C{keys}.
466
467         :param keys: keys to conform the keyring.
468         :type key: list(OpenPGPKey)
469
470         :return: a TempGPGWrapper instance
471         :rtype: TempGPGWrapper
472         """
473         # TODO do here checks on key_data
474         return TempGPGWrapper(
475             keys=keys, gpgbinary=self._gpgbinary)
476
477     @staticmethod
478     def _assert_gpg_result_ok(result):
479         """
480         Check if GPG result is 'ok' and log stderr outputs.
481
482         :param result: GPG results, which have a field calld 'ok' that states
483                        whether the gpg operation was successful or not.
484         :type result: object
485
486         :raise GPGError: Raised when the gpg operation was not successful.
487         """
488         stderr = getattr(result, 'stderr', None)
489         if stderr:
490             logger.debug("%s" % (stderr,))
491         if getattr(result, 'ok', None) is not True:
492             raise errors.GPGError(
493                 'Failed to encrypt/decrypt: %s' % stderr)
494
495     def encrypt(self, data, pubkey, passphrase=None, sign=None,
496                 cipher_algo='AES256'):
497         """
498         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
499
500         :param data: The data to be encrypted.
501         :type data: str
502         :param pubkey: The key used to encrypt.
503         :type pubkey: OpenPGPKey
504         :param sign: The key used for signing.
505         :type sign: OpenPGPKey
506         :param cipher_algo: The cipher algorithm to use.
507         :type cipher_algo: str
508
509         :return: The encrypted data.
510         :rtype: str
511
512         :raise EncryptError: Raised if failed encrypting for some reason.
513         """
514         leap_assert_type(pubkey, OpenPGPKey)
515         leap_assert(pubkey.private is False, 'Key is not public.')
516         keys = [pubkey]
517         if sign is not None:
518             leap_assert_type(sign, OpenPGPKey)
519             leap_assert(sign.private is True)
520             keys.append(sign)
521         with self._temporary_gpgwrapper(keys) as gpg:
522             result = gpg.encrypt(
523                 data, pubkey.fingerprint,
524                 default_key=sign.key_id if sign else None,
525                 passphrase=passphrase, symmetric=False,
526                 cipher_algo=cipher_algo)
527             # Here we cannot assert for correctness of sig because the sig is
528             # in the ciphertext.
529             # result.ok    - (bool) indicates if the operation succeeded
530             # result.data  - (bool) contains the result of the operation
531             try:
532                 self._assert_gpg_result_ok(result)
533                 return result.data
534             except errors.GPGError as e:
535                 logger.error('Failed to decrypt: %s.' % str(e))
536                 raise errors.EncryptError()
537
538     def decrypt(self, data, privkey, passphrase=None, verify=None):
539         """
540         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
541
542         :param data: The data to be decrypted.
543         :type data: str
544         :param privkey: The key used to decrypt.
545         :type privkey: OpenPGPKey
546         :param passphrase: The passphrase for the secret key used for
547                            decryption.
548         :type passphrase: str
549         :param verify: The key used to verify a signature.
550         :type verify: OpenPGPKey
551
552         :return: The decrypted data.
553         :rtype: unicode
554
555         :raise DecryptError: Raised if failed decrypting for some reason.
556         :raise InvalidSignature: Raised if unable to verify the signature with
557                                  C{verify} key.
558         """
559         leap_assert(privkey.private is True, 'Key is not private.')
560         keys = [privkey]
561         if verify is not None:
562             leap_assert_type(verify, OpenPGPKey)
563             leap_assert(verify.private is False)
564             keys.append(verify)
565         with self._temporary_gpgwrapper(keys) as gpg:
566             try:
567                 result = gpg.decrypt(
568                     data, passphrase=passphrase, always_trust=True)
569                 self._assert_gpg_result_ok(result)
570                 # verify signature
571                 if (verify is not None):
572                     if result.valid is False or \
573                             verify.fingerprint != result.pubkey_fingerprint:
574                         raise errors.InvalidSignature(
575                             'Failed to verify signature with key %s: %s' %
576                             (verify.key_id, result.stderr))
577
578                 return result.data
579             except errors.GPGError as e:
580                 logger.error('Failed to decrypt: %s.' % str(e))
581                 raise errors.DecryptError(str(e))
582
583     def is_encrypted(self, data):
584         """
585         Return whether C{data} was asymmetrically encrypted using OpenPGP.
586
587         :param data: The data we want to know about.
588         :type data: str
589
590         :return: Whether C{data} was encrypted using this wrapper.
591         :rtype: bool
592         """
593         with self._temporary_gpgwrapper() as gpg:
594             gpgutil = GPGUtilities(gpg)
595             return gpgutil.is_encrypted_asym(data)
596
597     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
598              detach=True, binary=False):
599         """
600         Sign C{data} with C{privkey}.
601
602         :param data: The data to be signed.
603         :type data: str
604
605         :param privkey: The private key to be used to sign.
606         :type privkey: OpenPGPKey
607         :param digest_algo: The hash digest to use.
608         :type digest_algo: str
609         :param clearsign: If True, create a cleartext signature.
610         :type clearsign: bool
611         :param detach: If True, create a detached signature.
612         :type detach: bool
613         :param binary: If True, do not ascii armour the output.
614         :type binary: bool
615
616         :return: The ascii-armored signed data.
617         :rtype: str
618         """
619         leap_assert_type(privkey, OpenPGPKey)
620         leap_assert(privkey.private is True)
621
622         # result.fingerprint - contains the fingerprint of the key used to
623         #                      sign.
624         with self._temporary_gpgwrapper(privkey) as gpg:
625             result = gpg.sign(data, default_key=privkey.key_id,
626                               digest_algo=digest_algo, clearsign=clearsign,
627                               detach=detach, binary=binary)
628             rfprint = privkey.fingerprint
629             privkey = gpg.list_keys(secret=True).pop()
630             kfprint = privkey['fingerprint']
631             if result.fingerprint is None:
632                 raise errors.SignFailed(
633                     'Failed to sign with key %s: %s' %
634                     (privkey['keyid'], result.stderr))
635             leap_assert(
636                 result.fingerprint == kfprint,
637                 'Signature and private key fingerprints mismatch: '
638                 '%s != %s' % (rfprint, kfprint))
639         return result.data
640
641     def verify(self, data, pubkey, detached_sig=None):
642         """
643         Verify signed C{data} with C{pubkey}, eventually using
644         C{detached_sig}.
645
646         :param data: The data to be verified.
647         :type data: str
648         :param pubkey: The public key to be used on verification.
649         :type pubkey: OpenPGPKey
650         :param detached_sig: A detached signature. If given, C{data} is
651                              verified against this detached signature.
652         :type detached_sig: str
653
654         :return: The ascii-armored signed data.
655         :rtype: str
656         """
657         leap_assert_type(pubkey, OpenPGPKey)
658         leap_assert(pubkey.private is False)
659         with self._temporary_gpgwrapper(pubkey) as gpg:
660             result = None
661             if detached_sig is None:
662                 result = gpg.verify(data)
663             else:
664                 # to verify using a detached sig we have to use
665                 # gpg.verify_file(), which receives the data as a binary
666                 # stream and the name of a file containing the signature.
667                 sf, sfname = tempfile.mkstemp()
668                 with os.fdopen(sf, 'w') as sfd:
669                     sfd.write(detached_sig)
670                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
671                 os.unlink(sfname)
672             gpgpubkey = gpg.list_keys().pop()
673             valid = result.valid
674             rfprint = result.fingerprint
675             kfprint = gpgpubkey['fingerprint']
676             # raise in case sig is invalid
677             if valid is False or rfprint != kfprint:
678                 raise errors.InvalidSignature(
679                     'Failed to verify signature '
680                     'with key %s.' % gpgpubkey['keyid'])
681             return True