Refactor code to support parsing ascii keys.
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import locale
25 import logging
26 import os
27 import re
28 import shutil
29 import sys
30 import tempfile
31
32 from contextlib import closing
33
34 from gnupg import GPG
35 from gnupg.gnupg import GPGUtilities
36 from gnupg._util import _make_binary_stream
37
38 from leap.common.check import leap_assert, leap_assert_type, leap_check
39 from leap.keymanager import errors
40 from leap.keymanager.keys import (
41     EncryptionKey,
42     EncryptionScheme,
43     is_address,
44     build_key_from_dict,
45     KEYMANAGER_KEY_TAG,
46     TAGS_ADDRESS_PRIVATE_INDEX,
47 )
48
49
50 logger = logging.getLogger(__name__)
51
52
53 #
54 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
55 #
56
57 class TempGPGWrapper(object):
58     """
59     A context manager that wraps a temporary GPG keyring which only contains
60     the keys given at object creation.
61     """
62
63     def __init__(self, keys=None, gpgbinary=None):
64         """
65         Create an empty temporary keyring and import any given C{keys} into
66         it.
67
68         :param keys: OpenPGP key, or list of.
69         :type keys: OpenPGPKey or list of OpenPGPKeys
70         :param gpgbinary: Name for GnuPG binary executable.
71         :type gpgbinary: C{str}
72         """
73         self._gpg = None
74         self._gpgbinary = gpgbinary
75         if not keys:
76             keys = list()
77         if not isinstance(keys, list):
78             keys = [keys]
79         self._keys = keys
80         for key in keys:
81             leap_assert_type(key, OpenPGPKey)
82
83     def __enter__(self):
84         """
85         Build and return a GPG keyring containing the keys given on
86         object creation.
87
88         :return: A GPG instance containing the keys given on object creation.
89         :rtype: gnupg.GPG
90         """
91         self._build_keyring()
92         return self._gpg
93
94     def __exit__(self, exc_type, exc_value, traceback):
95         """
96         Ensure the gpg is properly destroyed.
97         """
98         # TODO handle exceptions and log here
99         self._destroy_keyring()
100
101     def _build_keyring(self):
102         """
103         Create a GPG keyring containing the keys given on object creation.
104
105         :return: A GPG instance containing the keys given on object creation.
106         :rtype: gnupg.GPG
107         """
108         privkeys = [key for key in self._keys if key and key.private is True]
109         publkeys = [key for key in self._keys if key and key.private is False]
110         # here we filter out public keys that have a correspondent
111         # private key in the list because the private key_data by
112         # itself is enough to also have the public key in the keyring,
113         # and we want to count the keys afterwards.
114
115         privaddrs = map(lambda privkey: privkey.address, privkeys)
116         publkeys = filter(
117             lambda pubkey: pubkey.address not in privaddrs, publkeys)
118
119         listkeys = lambda: self._gpg.list_keys()
120         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
121
122         self._gpg = GPG(binary=self._gpgbinary,
123                         homedir=tempfile.mkdtemp())
124         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
125
126         # import keys into the keyring:
127         # concatenating ascii-armored keys, which is correctly
128         # understood by GPG.
129
130         self._gpg.import_keys("".join(
131             [x.key_data for x in publkeys + privkeys]))
132
133         # assert the number of keys in the keyring
134         leap_assert(
135             len(listkeys()) == len(publkeys) + len(privkeys),
136             'Wrong number of public keys in keyring: %d, should be %d)' %
137             (len(listkeys()), len(publkeys) + len(privkeys)))
138         leap_assert(
139             len(listsecretkeys()) == len(privkeys),
140             'Wrong number of private keys in keyring: %d, should be %d)' %
141             (len(listsecretkeys()), len(privkeys)))
142
143     def _destroy_keyring(self):
144         """
145         Securely erase the keyring.
146         """
147         # TODO: implement some kind of wiping of data or a more
148         # secure way that
149         # does not write to disk.
150
151         try:
152             for secret in [True, False]:
153                 for key in self._gpg.list_keys(secret=secret):
154                     self._gpg.delete_keys(
155                         key['fingerprint'],
156                         secret=secret)
157             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
158
159         except:
160             raise
161
162         finally:
163             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
164                         "watch out! Tried to remove default gnupg home!")
165             shutil.rmtree(self._gpg.homedir)
166
167
168 def _build_key_from_gpg(address, key, key_data):
169     """
170     Build an OpenPGPKey for C{address} based on C{key} from
171     local gpg storage.
172
173     ASCII armored GPG key data has to be queried independently in this
174     wrapper, so we receive it in C{key_data}.
175
176     :param address: The address bound to the key.
177     :type address: str
178     :param key: Key obtained from GPG storage.
179     :type key: dict
180     :param key_data: Key data obtained from GPG storage.
181     :type key_data: str
182     :return: An instance of the key.
183     :rtype: OpenPGPKey
184     """
185     return OpenPGPKey(
186         address,
187         key_id=key['keyid'],
188         fingerprint=key['fingerprint'],
189         key_data=key_data,
190         private=True if key['type'] == 'sec' else False,
191         length=key['length'],
192         expiry_date=key['expires'],
193         validation=None,  # TODO: verify for validation.
194     )
195
196
197 #
198 # The OpenPGP wrapper
199 #
200
201 class OpenPGPKey(EncryptionKey):
202     """
203     Base class for OpenPGP keys.
204     """
205
206
207 class OpenPGPScheme(EncryptionScheme):
208     """
209     A wrapper for OpenPGP keys management and use (encryption, decyption,
210     signing and verification).
211     """
212
213     def __init__(self, soledad, gpgbinary=None):
214         """
215         Initialize the OpenPGP wrapper.
216
217         :param soledad: A Soledad instance for key storage.
218         :type soledad: leap.soledad.Soledad
219         :param gpgbinary: Name for GnuPG binary executable.
220         :type gpgbinary: C{str}
221         """
222         EncryptionScheme.__init__(self, soledad)
223         self._gpgbinary = gpgbinary
224
225     #
226     # Keys management
227     #
228
229     def gen_key(self, address):
230         """
231         Generate an OpenPGP keypair bound to C{address}.
232
233         :param address: The address bound to the key.
234         :type address: str
235         :return: The key bound to C{address}.
236         :rtype: OpenPGPKey
237         @raise KeyAlreadyExists: If key already exists in local database.
238         """
239         # make sure the key does not already exist
240         leap_assert(is_address(address), 'Not an user address: %s' % address)
241         try:
242             self.get_key(address)
243             raise errors.KeyAlreadyExists(address)
244         except errors.KeyNotFound:
245             logger.debug('Key for %s not found' % (address,))
246
247         with self._temporary_gpgwrapper() as gpg:
248             # TODO: inspect result, or use decorator
249             params = gpg.gen_key_input(
250                 key_type='RSA',
251                 key_length=4096,
252                 name_real=address,
253                 name_email=address,
254                 name_comment='')
255             logger.info("About to generate keys... This might take SOME time.")
256             gpg.gen_key(params)
257             logger.info("Keys for %s have been successfully "
258                         "generated." % (address,))
259             pubkeys = gpg.list_keys()
260
261             # assert for new key characteristics
262
263             # XXX This exception is not properly catched by the soledad
264             # bootstrapping, so if we do not finish generating the keys
265             # we end with a blocked thread -- kali
266
267             leap_assert(
268                 len(pubkeys) is 1,  # a unitary keyring!
269                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
270             key = gpg.list_keys(secret=True).pop()
271             leap_assert(
272                 len(key['uids']) is 1,  # with just one uid!
273                 'Wrong number of uids for key: %d.' % len(key['uids']))
274             leap_assert(
275                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
276                 'Key not correctly bound to address.')
277             # insert both public and private keys in storage
278             for secret in [True, False]:
279                 key = gpg.list_keys(secret=secret).pop()
280                 openpgp_key = _build_key_from_gpg(
281                     address, key,
282                     gpg.export_keys(key['fingerprint'], secret=secret))
283                 self.put_key(openpgp_key)
284
285         return self.get_key(address, private=True)
286
287     def get_key(self, address, private=False):
288         """
289         Get key bound to C{address} from local storage.
290
291         :param address: The address bound to the key.
292         :type address: str
293         :param private: Look for a private key instead of a public one?
294         :type private: bool
295
296         :return: The key bound to C{address}.
297         :rtype: OpenPGPKey
298         @raise KeyNotFound: If the key was not found on local storage.
299         """
300         leap_assert(is_address(address), 'Not an user address: %s' % address)
301         doc = self._get_key_doc(address, private)
302         if doc is None:
303             raise errors.KeyNotFound(address)
304         return build_key_from_dict(OpenPGPKey, address, doc.content)
305
306     def parse_ascii_key(self, key_data):
307         """
308         Parses an ascii armored key (or key pair) data and returns
309         the OpenPGPKey keys.
310
311         :param key_data: the key data to be parsed.
312         :type key_data: str or unicode
313
314         :returns: the public key and private key (if applies) for that data.
315         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
316                 the tuple may have one or both components None
317         """
318         leap_assert_type(key_data, (str, unicode))
319         # TODO: add more checks for correct key data.
320         leap_assert(key_data is not None, 'Data does not represent a key.')
321         mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
322
323         with self._temporary_gpgwrapper() as gpg:
324             # TODO: inspect result, or use decorator
325             gpg.import_keys(key_data)
326             privkey = None
327             pubkey = None
328
329             try:
330                 privkey = gpg.list_keys(secret=True).pop()
331             except IndexError:
332                 pass
333             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
334
335             # extract adress from first uid on key
336             match = re.match(mail_regex, pubkey['uids'].pop())
337             leap_assert(match is not None, 'No user address in key data.')
338             address = match.group(1)
339
340             if privkey is not None:
341                 match = re.match(mail_regex, privkey['uids'].pop())
342                 leap_assert(match is not None, 'No user address in key data.')
343                 privaddress = match.group(1)
344
345                 # build private key
346                 openpgp_privkey = _build_key_from_gpg(
347                     privaddress, privkey,
348                     gpg.export_keys(privkey['fingerprint'], secret=True))
349
350                 leap_check(address == privaddress,
351                            'Addresses in public and private key differ.',
352                            errors.KeyAddressMismatch)
353                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
354                            'Fingerprints for public and private key differ.',
355                            errors.KeyFingerprintMismatch)
356
357             # build public key
358             openpgp_pubkey = _build_key_from_gpg(
359                 address, pubkey,
360                 gpg.export_keys(pubkey['fingerprint'], secret=False))
361
362             return (openpgp_pubkey, openpgp_privkey)
363
364     def put_ascii_key(self, key_data):
365         """
366         Put key contained in ascii-armored C{key_data} in local storage.
367
368         :param key_data: The key data to be stored.
369         :type key_data: str or unicode
370         """
371         leap_assert_type(key_data, (str, unicode))
372
373         try:
374             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
375         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
376             leap_assert(False, repr(e))
377
378         if openpgp_pubkey is not None:
379             self.put_key(openpgp_pubkey)
380         if openpgp_privkey is not None:
381             self.put_key(openpgp_privkey)
382
383     def put_key(self, key):
384         """
385         Put C{key} in local storage.
386
387         :param key: The key to be stored.
388         :type key: OpenPGPKey
389         """
390         doc = self._get_key_doc(key.address, private=key.private)
391         if doc is None:
392             self._soledad.create_doc_from_json(key.get_json())
393         else:
394             doc.set_json(key.get_json())
395             self._soledad.put_doc(doc)
396
397     def _get_key_doc(self, address, private=False):
398         """
399         Get the document with a key (public, by default) bound to C{address}.
400
401         If C{private} is True, looks for a private key instead of a public.
402
403         :param address: The address bound to the key.
404         :type address: str
405         :param private: Whether to look for a private key.
406         :type private: bool
407         :return: The document with the key or None if it does not exist.
408         :rtype: leap.soledad.document.SoledadDocument
409         """
410         doclist = self._soledad.get_from_index(
411             TAGS_ADDRESS_PRIVATE_INDEX,
412             KEYMANAGER_KEY_TAG,
413             address,
414             '1' if private else '0')
415         if len(doclist) is 0:
416             return None
417         leap_assert(
418             len(doclist) is 1,
419             'Found more than one %s key for address!' %
420             'private' if private else 'public')
421         return doclist.pop()
422
423     def delete_key(self, key):
424         """
425         Remove C{key} from storage.
426
427         :param key: The key to be removed.
428         :type key: EncryptionKey
429         """
430         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
431         stored_key = self.get_key(key.address, private=key.private)
432         if stored_key is None:
433             raise errors.KeyNotFound(key)
434         if stored_key.__dict__ != key.__dict__:
435             raise errors.KeyAttributesDiffer(key)
436         doc = self._get_key_doc(key.address, key.private)
437         self._soledad.delete_doc(doc)
438
439     #
440     # Data encryption, decryption, signing and verifying
441     #
442
443     def _temporary_gpgwrapper(self, keys=None):
444         """
445         Return a gpg wrapper that implements the context manager protocol and
446         contains C{keys}.
447
448         :param key_data: ASCII armored key data.
449         :type key_data: str
450         :param gpgbinary: Name for GnuPG binary executable.
451         :type gpgbinary: C{str}
452
453         :return: a TempGPGWrapper instance
454         :rtype: TempGPGWrapper
455         """
456         # TODO do here checks on key_data
457         return TempGPGWrapper(
458             keys=keys, gpgbinary=self._gpgbinary)
459
460     @staticmethod
461     def _assert_gpg_result_ok(result):
462         """
463         Check if GPG result is 'ok' and log stderr outputs.
464         :param result: The GPG results
465         :type result:
466         """
467         stderr = getattr(result, 'stderr', None)
468         if stderr:
469             logger.debug("%s" % (stderr,))
470         if getattr(result, 'ok', None) is not True:
471             raise errors.EncryptionDecryptionFailed(
472                 'Failed to encrypt/decrypt: %s' % stderr)
473
474     def encrypt(self, data, pubkey, passphrase=None, sign=None,
475                 cipher_algo='AES256'):
476         """
477         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
478
479         :param data: The data to be encrypted.
480         :type data: str
481         :param pubkey: The key used to encrypt.
482         :type pubkey: OpenPGPKey
483         :param sign: The key used for signing.
484         :type sign: OpenPGPKey
485         :param cipher_algo: The cipher algorithm to use.
486         :type cipher_algo: str
487
488         :return: The encrypted data.
489         :rtype: str
490         """
491         leap_assert_type(pubkey, OpenPGPKey)
492         leap_assert(pubkey.private is False, 'Key is not public.')
493         keys = [pubkey]
494         if sign is not None:
495             leap_assert_type(sign, OpenPGPKey)
496             leap_assert(sign.private is True)
497             keys.append(sign)
498         with self._temporary_gpgwrapper(keys) as gpg:
499             result = gpg.encrypt(
500                 data, pubkey.fingerprint,
501                 default_key=sign.key_id if sign else None,
502                 passphrase=passphrase, symmetric=False,
503                 cipher_algo=cipher_algo)
504             # Here we cannot assert for correctness of sig because the sig is
505             # in the ciphertext.
506             # result.ok    - (bool) indicates if the operation succeeded
507             # result.data  - (bool) contains the result of the operation
508             self._assert_gpg_result_ok(result)
509             return result.data
510
511     def decrypt(self, data, privkey, passphrase=None, verify=None):
512         """
513         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
514
515         :param data: The data to be decrypted.
516         :type data: str
517         :param privkey: The key used to decrypt.
518         :type privkey: OpenPGPKey
519         :param verify: The key used to verify a signature.
520         :type verify: OpenPGPKey
521
522         :return: The decrypted data.
523         :rtype: unicode
524
525         @raise InvalidSignature: Raised if unable to verify the signature with
526             C{verify} key.
527         """
528         leap_assert(privkey.private is True, 'Key is not private.')
529         keys = [privkey]
530         if verify is not None:
531             leap_assert_type(verify, OpenPGPKey)
532             leap_assert(verify.private is False)
533             keys.append(verify)
534         with self._temporary_gpgwrapper(keys) as gpg:
535             result = gpg.decrypt(
536                 data, passphrase=passphrase, always_trust=True)
537             self._assert_gpg_result_ok(result)
538             # verify signature
539             if (verify is not None):
540                 if result.valid is False or \
541                         verify.fingerprint != result.pubkey_fingerprint:
542                     raise errors.InvalidSignature(
543                         'Failed to verify signature with key %s: %s' %
544                         (verify.key_id, stderr))
545
546             # XXX: this is the encoding used by gpg module
547             # https://github.com/isislovecruft/python-gnupg/\
548             #   blob/master/gnupg/_meta.py#L121
549             encoding = locale.getpreferredencoding()
550             if encoding is None:
551                 encoding = sys.stdin.encoding
552             if encoding is None:
553                 encoding = 'utf-8'
554             return result.data.decode(encoding, 'replace')
555
556     def is_encrypted(self, data):
557         """
558         Return whether C{data} was asymmetrically encrypted using OpenPGP.
559
560         :param data: The data we want to know about.
561         :type data: str
562
563         :return: Whether C{data} was encrypted using this wrapper.
564         :rtype: bool
565         """
566         with self._temporary_gpgwrapper() as gpg:
567             gpgutil = GPGUtilities(gpg)
568             return gpgutil.is_encrypted_asym(data)
569
570     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
571              detach=True, binary=False):
572         """
573         Sign C{data} with C{privkey}.
574
575         :param data: The data to be signed.
576         :type data: str
577
578         :param privkey: The private key to be used to sign.
579         :type privkey: OpenPGPKey
580         :param digest_algo: The hash digest to use.
581         :type digest_algo: str
582         :param clearsign: If True, create a cleartext signature.
583         :type clearsign: bool
584         :param detach: If True, create a detached signature.
585         :type detach: bool
586         :param binary: If True, do not ascii armour the output.
587         :type binary: bool
588
589         :return: The ascii-armored signed data.
590         :rtype: str
591         """
592         leap_assert_type(privkey, OpenPGPKey)
593         leap_assert(privkey.private is True)
594
595         # result.fingerprint - contains the fingerprint of the key used to
596         #                      sign.
597         with self._temporary_gpgwrapper(privkey) as gpg:
598             result = gpg.sign(data, default_key=privkey.key_id,
599                               digest_algo=digest_algo, clearsign=clearsign,
600                               detach=detach, binary=binary)
601             rfprint = privkey.fingerprint
602             privkey = gpg.list_keys(secret=True).pop()
603             kfprint = privkey['fingerprint']
604             if result.fingerprint is None:
605                 raise errors.SignFailed(
606                     'Failed to sign with key %s: %s' %
607                     (privkey['keyid'], stderr))
608             leap_assert(
609                 result.fingerprint == kfprint,
610                 'Signature and private key fingerprints mismatch: '
611                 '%s != %s' % (rfprint, kfprint))
612         return result.data
613
614     def verify(self, data, pubkey, detached_sig=None):
615         """
616         Verify signed C{data} with C{pubkey}, eventually using
617         C{detached_sig}.
618
619         :param data: The data to be verified.
620         :type data: str
621         :param pubkey: The public key to be used on verification.
622         :type pubkey: OpenPGPKey
623         :param detached_sig: A detached signature. If given, C{data} is
624                              verified against this detached signature.
625         :type detached_sig: str
626
627         :return: The ascii-armored signed data.
628         :rtype: str
629         """
630         leap_assert_type(pubkey, OpenPGPKey)
631         leap_assert(pubkey.private is False)
632         with self._temporary_gpgwrapper(pubkey) as gpg:
633             result = None
634             if detached_sig is None:
635                 result = gpg.verify(data)
636             else:
637                 # to verify using a detached sig we have to use
638                 # gpg.verify_file(), which receives the data as a binary
639                 # stream and the name of a file containing the signature.
640                 sf, sfname = tempfile.mkstemp()
641                 with os.fdopen(sf, 'w') as sfd:
642                     sfd.write(detached_sig)
643                 with closing(_make_binary_stream(data, gpg._encoding)) as df:
644                     result = gpg.verify_file(df, sig_file=sfname)
645             gpgpubkey = gpg.list_keys().pop()
646             valid = result.valid
647             rfprint = result.fingerprint
648             kfprint = gpgpubkey['fingerprint']
649             # raise in case sig is invalid
650             if valid is False or rfprint != kfprint:
651                 raise errors.InvalidSignature(
652                     'Failed to verify signature '
653                     'with key %s.' % gpgpubkey['keyid'])
654             return True