Implement the new encryption-key soledad document
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from datetime import datetime
29 from gnupg import GPG
30 from gnupg.gnupg import GPGUtilities
31
32 from leap.common.check import leap_assert, leap_assert_type, leap_check
33 from leap.keymanager import errors
34 from leap.keymanager.keys import (
35     EncryptionKey,
36     EncryptionScheme,
37     is_address,
38     build_key_from_dict,
39     KEYMANAGER_KEY_TAG,
40     TAGS_ADDRESS_PRIVATE_INDEX,
41     KEY_FINGERPRINT_KEY,
42     KEY_DATA_KEY,
43 )
44
45
46 logger = logging.getLogger(__name__)
47
48
49 #
50 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
51 #
52
53 class TempGPGWrapper(object):
54     """
55     A context manager that wraps a temporary GPG keyring which only contains
56     the keys given at object creation.
57     """
58
59     def __init__(self, keys=None, gpgbinary=None):
60         """
61         Create an empty temporary keyring and import any given C{keys} into
62         it.
63
64         :param keys: OpenPGP key, or list of.
65         :type keys: OpenPGPKey or list of OpenPGPKeys
66         :param gpgbinary: Name for GnuPG binary executable.
67         :type gpgbinary: C{str}
68         """
69         self._gpg = None
70         self._gpgbinary = gpgbinary
71         if not keys:
72             keys = list()
73         if not isinstance(keys, list):
74             keys = [keys]
75         self._keys = keys
76         for key in keys:
77             leap_assert_type(key, OpenPGPKey)
78
79     def __enter__(self):
80         """
81         Build and return a GPG keyring containing the keys given on
82         object creation.
83
84         :return: A GPG instance containing the keys given on object creation.
85         :rtype: gnupg.GPG
86         """
87         self._build_keyring()
88         return self._gpg
89
90     def __exit__(self, exc_type, exc_value, traceback):
91         """
92         Ensure the gpg is properly destroyed.
93         """
94         # TODO handle exceptions and log here
95         self._destroy_keyring()
96
97     def _build_keyring(self):
98         """
99         Create a GPG keyring containing the keys given on object creation.
100
101         :return: A GPG instance containing the keys given on object creation.
102         :rtype: gnupg.GPG
103         """
104         privkeys = [key for key in self._keys if key and key.private is True]
105         publkeys = [key for key in self._keys if key and key.private is False]
106         # here we filter out public keys that have a correspondent
107         # private key in the list because the private key_data by
108         # itself is enough to also have the public key in the keyring,
109         # and we want to count the keys afterwards.
110
111         privaddrs = map(lambda privkey: privkey.address[0], privkeys)
112         publkeys = filter(
113             lambda pubkey: pubkey.address[0] not in privaddrs, publkeys)
114
115         listkeys = lambda: self._gpg.list_keys()
116         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
117
118         self._gpg = GPG(binary=self._gpgbinary,
119                         homedir=tempfile.mkdtemp())
120         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
121
122         # import keys into the keyring:
123         # concatenating ascii-armored keys, which is correctly
124         # understood by GPG.
125
126         self._gpg.import_keys("".join(
127             [x.key_data for x in publkeys + privkeys]))
128
129         # assert the number of keys in the keyring
130         leap_assert(
131             len(listkeys()) == len(publkeys) + len(privkeys),
132             'Wrong number of public keys in keyring: %d, should be %d)' %
133             (len(listkeys()), len(publkeys) + len(privkeys)))
134         leap_assert(
135             len(listsecretkeys()) == len(privkeys),
136             'Wrong number of private keys in keyring: %d, should be %d)' %
137             (len(listsecretkeys()), len(privkeys)))
138
139     def _destroy_keyring(self):
140         """
141         Securely erase the keyring.
142         """
143         # TODO: implement some kind of wiping of data or a more
144         # secure way that
145         # does not write to disk.
146
147         try:
148             for secret in [True, False]:
149                 for key in self._gpg.list_keys(secret=secret):
150                     self._gpg.delete_keys(
151                         key['fingerprint'],
152                         secret=secret)
153             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
154
155         except:
156             raise
157
158         finally:
159             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
160                         "watch out! Tried to remove default gnupg home!")
161             shutil.rmtree(self._gpg.homedir)
162
163
164 def _build_key_from_gpg(address, key, key_data):
165     """
166     Build an OpenPGPKey for C{address} based on C{key} from
167     local gpg storage.
168
169     ASCII armored GPG key data has to be queried independently in this
170     wrapper, so we receive it in C{key_data}.
171
172     :param address: The address bound to the key.
173     :type address: str
174     :param key: Key obtained from GPG storage.
175     :type key: dict
176     :param key_data: Key data obtained from GPG storage.
177     :type key_data: str
178     :return: An instance of the key.
179     :rtype: OpenPGPKey
180     """
181     expiry_date = None
182     if key['expires']:
183         expiry_date = datetime.fromtimestamp(int(key['expires']))
184
185     return OpenPGPKey(
186         [address],
187         key_id=key['keyid'],
188         fingerprint=key['fingerprint'],
189         key_data=key_data,
190         private=True if key['type'] == 'sec' else False,
191         length=int(key['length']),
192         expiry_date=expiry_date,
193         refreshed_at=datetime.now(),
194     )
195
196
197 #
198 # The OpenPGP wrapper
199 #
200
201 class OpenPGPKey(EncryptionKey):
202     """
203     Base class for OpenPGP keys.
204     """
205
206
207 class OpenPGPScheme(EncryptionScheme):
208     """
209     A wrapper for OpenPGP keys management and use (encryption, decyption,
210     signing and verification).
211     """
212
213     def __init__(self, soledad, gpgbinary=None):
214         """
215         Initialize the OpenPGP wrapper.
216
217         :param soledad: A Soledad instance for key storage.
218         :type soledad: leap.soledad.Soledad
219         :param gpgbinary: Name for GnuPG binary executable.
220         :type gpgbinary: C{str}
221         """
222         EncryptionScheme.__init__(self, soledad)
223         self._gpgbinary = gpgbinary
224
225     #
226     # Keys management
227     #
228
229     def gen_key(self, address):
230         """
231         Generate an OpenPGP keypair bound to C{address}.
232
233         :param address: The address bound to the key.
234         :type address: str
235         :return: The key bound to C{address}.
236         :rtype: OpenPGPKey
237         @raise KeyAlreadyExists: If key already exists in local database.
238         """
239         # make sure the key does not already exist
240         leap_assert(is_address(address), 'Not an user address: %s' % address)
241         try:
242             self.get_key(address)
243             raise errors.KeyAlreadyExists(address)
244         except errors.KeyNotFound:
245             logger.debug('Key for %s not found' % (address,))
246
247         with self._temporary_gpgwrapper() as gpg:
248             # TODO: inspect result, or use decorator
249             params = gpg.gen_key_input(
250                 key_type='RSA',
251                 key_length=4096,
252                 name_real=address,
253                 name_email=address,
254                 name_comment='')
255             logger.info("About to generate keys... This might take SOME time.")
256             gpg.gen_key(params)
257             logger.info("Keys for %s have been successfully "
258                         "generated." % (address,))
259             pubkeys = gpg.list_keys()
260
261             # assert for new key characteristics
262
263             # XXX This exception is not properly catched by the soledad
264             # bootstrapping, so if we do not finish generating the keys
265             # we end with a blocked thread -- kali
266
267             leap_assert(
268                 len(pubkeys) is 1,  # a unitary keyring!
269                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
270             key = gpg.list_keys(secret=True).pop()
271             leap_assert(
272                 len(key['uids']) is 1,  # with just one uid!
273                 'Wrong number of uids for key: %d.' % len(key['uids']))
274             leap_assert(
275                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
276                 'Key not correctly bound to address.')
277             # insert both public and private keys in storage
278             for secret in [True, False]:
279                 key = gpg.list_keys(secret=secret).pop()
280                 openpgp_key = _build_key_from_gpg(
281                     address, key,
282                     gpg.export_keys(key['fingerprint'], secret=secret))
283                 self.put_key(openpgp_key)
284
285         return self.get_key(address, private=True)
286
287     def get_key(self, address, private=False):
288         """
289         Get key bound to C{address} from local storage.
290
291         :param address: The address bound to the key.
292         :type address: str
293         :param private: Look for a private key instead of a public one?
294         :type private: bool
295
296         :return: The key bound to C{address}.
297         :rtype: OpenPGPKey
298         @raise KeyNotFound: If the key was not found on local storage.
299         """
300         # Remove the identity suffix after the '+' until the '@'
301         # e.g.: test_user+something@provider.com becomes test_user@provider.com
302         # since the key belongs to the identity without the '+' suffix.
303         address = re.sub(r'\+.*\@', '@', address)
304
305         doc = self._get_key_doc(address, private)
306         if doc is None:
307             raise errors.KeyNotFound(address)
308         return build_key_from_dict(OpenPGPKey, address, doc.content)
309
310     def parse_ascii_key(self, key_data):
311         """
312         Parses an ascii armored key (or key pair) data and returns
313         the OpenPGPKey keys.
314
315         :param key_data: the key data to be parsed.
316         :type key_data: str or unicode
317
318         :returns: the public key and private key (if applies) for that data.
319         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
320                 the tuple may have one or both components None
321         """
322         leap_assert_type(key_data, (str, unicode))
323         # TODO: add more checks for correct key data.
324         leap_assert(key_data is not None, 'Data does not represent a key.')
325         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
326
327         with self._temporary_gpgwrapper() as gpg:
328             # TODO: inspect result, or use decorator
329             gpg.import_keys(key_data)
330             privkey = None
331             pubkey = None
332
333             try:
334                 privkey = gpg.list_keys(secret=True).pop()
335             except IndexError:
336                 pass
337             try:
338                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
339             except IndexError:
340                 return (None, None)
341
342             # extract adress from first uid on key
343             match = re.match(mail_regex, pubkey['uids'].pop())
344             leap_assert(match is not None, 'No user address in key data.')
345             address = match.group(1)
346
347             openpgp_privkey = None
348             if privkey is not None:
349                 match = re.match(mail_regex, privkey['uids'].pop())
350                 leap_assert(match is not None, 'No user address in key data.')
351                 privaddress = match.group(1)
352
353                 # build private key
354                 openpgp_privkey = _build_key_from_gpg(
355                     privaddress, privkey,
356                     gpg.export_keys(privkey['fingerprint'], secret=True))
357
358                 leap_check(address == privaddress,
359                            'Addresses in public and private key differ.',
360                            errors.KeyAddressMismatch)
361                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
362                            'Fingerprints for public and private key differ.',
363                            errors.KeyFingerprintMismatch)
364
365             # build public key
366             openpgp_pubkey = _build_key_from_gpg(
367                 address, pubkey,
368                 gpg.export_keys(pubkey['fingerprint'], secret=False))
369
370             return (openpgp_pubkey, openpgp_privkey)
371
372     def put_ascii_key(self, key_data):
373         """
374         Put key contained in ascii-armored C{key_data} in local storage.
375
376         :param key_data: The key data to be stored.
377         :type key_data: str or unicode
378         """
379         leap_assert_type(key_data, (str, unicode))
380
381         openpgp_privkey = None
382         try:
383             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
384         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
385             leap_assert(False, repr(e))
386
387         if openpgp_pubkey is not None:
388             self.put_key(openpgp_pubkey)
389         if openpgp_privkey is not None:
390             self.put_key(openpgp_privkey)
391
392     def put_key(self, key):
393         """
394         Put C{key} in local storage.
395
396         :param key: The key to be stored.
397         :type key: OpenPGPKey
398         """
399         doc = self._get_key_doc(key.address[0], private=key.private)
400         if doc is None:
401             self._soledad.create_doc_from_json(key.get_json())
402         else:
403             if key.fingerprint == doc.content[KEY_FINGERPRINT_KEY]:
404                 # in case of an update of the key merge them with gnupg
405                 with self._temporary_gpgwrapper() as gpg:
406                     gpg.import_keys(doc.content[KEY_DATA_KEY])
407                     gpg.import_keys(key.key_data)
408                     gpgkey = gpg.list_keys(secret=key.private).pop()
409                     key = _build_key_from_gpg(
410                         key.address[0], gpgkey,
411                         gpg.export_keys(gpgkey['fingerprint'],
412                                         secret=key.private))
413             doc.set_json(key.get_json())
414             self._soledad.put_doc(doc)
415
416     def _get_key_doc(self, address, private=False):
417         """
418         Get the document with a key (public, by default) bound to C{address}.
419
420         If C{private} is True, looks for a private key instead of a public.
421
422         :param address: The address bound to the key.
423         :type address: str
424         :param private: Whether to look for a private key.
425         :type private: bool
426         :return: The document with the key or None if it does not exist.
427         :rtype: leap.soledad.document.SoledadDocument
428         """
429         doclist = self._soledad.get_from_index(
430             TAGS_ADDRESS_PRIVATE_INDEX,
431             KEYMANAGER_KEY_TAG,
432             address,
433             '1' if private else '0')
434         if len(doclist) is 0:
435             return None
436         leap_assert(
437             len(doclist) is 1,
438             'Found more than one %s key for address!' %
439             'private' if private else 'public')
440         return doclist.pop()
441
442     def delete_key(self, key):
443         """
444         Remove C{key} from storage.
445
446         May raise:
447             errors.KeyNotFound
448             errors.KeyAttributesDiffer
449
450         :param key: The key to be removed.
451         :type key: EncryptionKey
452         """
453         leap_assert_type(key, OpenPGPKey)
454         doc = self._get_key_doc(key.address[0], key.private)
455         if doc is None:
456             raise errors.KeyNotFound(key)
457         if doc.content[KEY_FINGERPRINT_KEY] != key.fingerprint:
458             raise errors.KeyAttributesDiffer(key)
459         self._soledad.delete_doc(doc)
460
461     #
462     # Data encryption, decryption, signing and verifying
463     #
464
465     def _temporary_gpgwrapper(self, keys=None):
466         """
467         Return a gpg wrapper that implements the context manager protocol and
468         contains C{keys}.
469
470         :param keys: keys to conform the keyring.
471         :type key: list(OpenPGPKey)
472
473         :return: a TempGPGWrapper instance
474         :rtype: TempGPGWrapper
475         """
476         # TODO do here checks on key_data
477         return TempGPGWrapper(
478             keys=keys, gpgbinary=self._gpgbinary)
479
480     @staticmethod
481     def _assert_gpg_result_ok(result):
482         """
483         Check if GPG result is 'ok' and log stderr outputs.
484
485         :param result: GPG results, which have a field calld 'ok' that states
486                        whether the gpg operation was successful or not.
487         :type result: object
488
489         :raise GPGError: Raised when the gpg operation was not successful.
490         """
491         stderr = getattr(result, 'stderr', None)
492         if stderr:
493             logger.debug("%s" % (stderr,))
494         if getattr(result, 'ok', None) is not True:
495             raise errors.GPGError(
496                 'Failed to encrypt/decrypt: %s' % stderr)
497
498     def encrypt(self, data, pubkey, passphrase=None, sign=None,
499                 cipher_algo='AES256'):
500         """
501         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
502
503         :param data: The data to be encrypted.
504         :type data: str
505         :param pubkey: The key used to encrypt.
506         :type pubkey: OpenPGPKey
507         :param sign: The key used for signing.
508         :type sign: OpenPGPKey
509         :param cipher_algo: The cipher algorithm to use.
510         :type cipher_algo: str
511
512         :return: The encrypted data.
513         :rtype: str
514
515         :raise EncryptError: Raised if failed encrypting for some reason.
516         """
517         leap_assert_type(pubkey, OpenPGPKey)
518         leap_assert(pubkey.private is False, 'Key is not public.')
519         keys = [pubkey]
520         if sign is not None:
521             leap_assert_type(sign, OpenPGPKey)
522             leap_assert(sign.private is True)
523             keys.append(sign)
524         with self._temporary_gpgwrapper(keys) as gpg:
525             result = gpg.encrypt(
526                 data, pubkey.fingerprint,
527                 default_key=sign.key_id if sign else None,
528                 passphrase=passphrase, symmetric=False,
529                 cipher_algo=cipher_algo)
530             # Here we cannot assert for correctness of sig because the sig is
531             # in the ciphertext.
532             # result.ok    - (bool) indicates if the operation succeeded
533             # result.data  - (bool) contains the result of the operation
534             try:
535                 self._assert_gpg_result_ok(result)
536                 return result.data
537             except errors.GPGError as e:
538                 logger.error('Failed to decrypt: %s.' % str(e))
539                 raise errors.EncryptError()
540
541     def decrypt(self, data, privkey, passphrase=None, verify=None):
542         """
543         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
544
545         :param data: The data to be decrypted.
546         :type data: str
547         :param privkey: The key used to decrypt.
548         :type privkey: OpenPGPKey
549         :param passphrase: The passphrase for the secret key used for
550                            decryption.
551         :type passphrase: str
552         :param verify: The key used to verify a signature.
553         :type verify: OpenPGPKey
554
555         :return: The decrypted data.
556         :rtype: unicode
557
558         :raise DecryptError: Raised if failed decrypting for some reason.
559         :raise InvalidSignature: Raised if unable to verify the signature with
560                                  C{verify} key.
561         """
562         leap_assert(privkey.private is True, 'Key is not private.')
563         keys = [privkey]
564         if verify is not None:
565             leap_assert_type(verify, OpenPGPKey)
566             leap_assert(verify.private is False)
567             keys.append(verify)
568         with self._temporary_gpgwrapper(keys) as gpg:
569             try:
570                 result = gpg.decrypt(
571                     data, passphrase=passphrase, always_trust=True)
572                 self._assert_gpg_result_ok(result)
573                 # verify signature
574                 if (verify is not None):
575                     if result.valid is False or \
576                             verify.fingerprint != result.pubkey_fingerprint:
577                         raise errors.InvalidSignature(
578                             'Failed to verify signature with key %s: %s' %
579                             (verify.key_id, result.stderr))
580
581                 return result.data
582             except errors.GPGError as e:
583                 logger.error('Failed to decrypt: %s.' % str(e))
584                 raise errors.DecryptError(str(e))
585
586     def is_encrypted(self, data):
587         """
588         Return whether C{data} was asymmetrically encrypted using OpenPGP.
589
590         :param data: The data we want to know about.
591         :type data: str
592
593         :return: Whether C{data} was encrypted using this wrapper.
594         :rtype: bool
595         """
596         with self._temporary_gpgwrapper() as gpg:
597             gpgutil = GPGUtilities(gpg)
598             return gpgutil.is_encrypted_asym(data)
599
600     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
601              detach=True, binary=False):
602         """
603         Sign C{data} with C{privkey}.
604
605         :param data: The data to be signed.
606         :type data: str
607
608         :param privkey: The private key to be used to sign.
609         :type privkey: OpenPGPKey
610         :param digest_algo: The hash digest to use.
611         :type digest_algo: str
612         :param clearsign: If True, create a cleartext signature.
613         :type clearsign: bool
614         :param detach: If True, create a detached signature.
615         :type detach: bool
616         :param binary: If True, do not ascii armour the output.
617         :type binary: bool
618
619         :return: The ascii-armored signed data.
620         :rtype: str
621         """
622         leap_assert_type(privkey, OpenPGPKey)
623         leap_assert(privkey.private is True)
624
625         # result.fingerprint - contains the fingerprint of the key used to
626         #                      sign.
627         with self._temporary_gpgwrapper(privkey) as gpg:
628             result = gpg.sign(data, default_key=privkey.key_id,
629                               digest_algo=digest_algo, clearsign=clearsign,
630                               detach=detach, binary=binary)
631             rfprint = privkey.fingerprint
632             privkey = gpg.list_keys(secret=True).pop()
633             kfprint = privkey['fingerprint']
634             if result.fingerprint is None:
635                 raise errors.SignFailed(
636                     'Failed to sign with key %s: %s' %
637                     (privkey['keyid'], result.stderr))
638             leap_assert(
639                 result.fingerprint == kfprint,
640                 'Signature and private key fingerprints mismatch: '
641                 '%s != %s' % (rfprint, kfprint))
642         return result.data
643
644     def verify(self, data, pubkey, detached_sig=None):
645         """
646         Verify signed C{data} with C{pubkey}, eventually using
647         C{detached_sig}.
648
649         :param data: The data to be verified.
650         :type data: str
651         :param pubkey: The public key to be used on verification.
652         :type pubkey: OpenPGPKey
653         :param detached_sig: A detached signature. If given, C{data} is
654                              verified against this detached signature.
655         :type detached_sig: str
656
657         :return: The ascii-armored signed data.
658         :rtype: str
659         """
660         leap_assert_type(pubkey, OpenPGPKey)
661         leap_assert(pubkey.private is False)
662         with self._temporary_gpgwrapper(pubkey) as gpg:
663             result = None
664             if detached_sig is None:
665                 result = gpg.verify(data)
666             else:
667                 # to verify using a detached sig we have to use
668                 # gpg.verify_file(), which receives the data as a binary
669                 # stream and the name of a file containing the signature.
670                 sf, sfname = tempfile.mkstemp()
671                 with os.fdopen(sf, 'w') as sfd:
672                     sfd.write(detached_sig)
673                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
674                 os.unlink(sfname)
675             gpgpubkey = gpg.list_keys().pop()
676             valid = result.valid
677             rfprint = result.fingerprint
678             kfprint = gpgpubkey['fingerprint']
679             # raise in case sig is invalid
680             if valid is False or rfprint != kfprint:
681                 raise errors.InvalidSignature(
682                     'Failed to verify signature '
683                     'with key %s.' % gpgpubkey['keyid'])
684             return True