Return unicode in order to solve encoding issues.
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import logging
25 import os
26 import re
27 import shutil
28 import tempfile
29 import locale
30
31 from gnupg import GPG
32 from gnupg.gnupg import GPGUtilities
33
34 from leap.common.check import leap_assert, leap_assert_type
35 from leap.keymanager import errors
36 from leap.keymanager.keys import (
37     EncryptionKey,
38     EncryptionScheme,
39     is_address,
40     build_key_from_dict,
41     KEYMANAGER_KEY_TAG,
42     TAGS_ADDRESS_PRIVATE_INDEX,
43 )
44
45
46 logger = logging.getLogger(__name__)
47
48
49 class TempGPGWrapper(object):
50     """
51     A context manager that wraps a temporary GPG keyring which only contains
52     the keys given at object creation.
53     """
54
55     def __init__(self, keys=None, gpgbinary=None):
56         """
57         Create an empty temporary keyring and import any given C{keys} into
58         it.
59
60         :param keys: OpenPGP key, or list of.
61         :type keys: OpenPGPKey or list of OpenPGPKeys
62         :param gpgbinary: Name for GnuPG binary executable.
63         :type gpgbinary: C{str}
64         """
65         self._gpg = None
66         self._gpgbinary = gpgbinary
67         if not keys:
68             keys = list()
69         if not isinstance(keys, list):
70             keys = [keys]
71         self._keys = keys
72         for key in keys:
73             leap_assert_type(key, OpenPGPKey)
74
75     def __enter__(self):
76         """
77         Build and return a GPG keyring containing the keys given on
78         object creation.
79
80         :return: A GPG instance containing the keys given on object creation.
81         :rtype: gnupg.GPG
82         """
83         self._build_keyring()
84         return self._gpg
85
86     def __exit__(self, exc_type, exc_value, traceback):
87         """
88         Ensure the gpg is properly destroyed.
89         """
90         # TODO handle exceptions and log here
91         self._destroy_keyring()
92
93     def _build_keyring(self):
94         """
95         Create a GPG keyring containing the keys given on object creation.
96
97         :return: A GPG instance containing the keys given on object creation.
98         :rtype: gnupg.GPG
99         """
100         privkeys = [key for key in self._keys if key and key.private is True]
101         publkeys = [key for key in self._keys if key and key.private is False]
102         # here we filter out public keys that have a correspondent
103         # private key in the list because the private key_data by
104         # itself is enough to also have the public key in the keyring,
105         # and we want to count the keys afterwards.
106
107         privaddrs = map(lambda privkey: privkey.address, privkeys)
108         publkeys = filter(
109             lambda pubkey: pubkey.address not in privaddrs, publkeys)
110
111         listkeys = lambda: self._gpg.list_keys()
112         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
113
114         self._gpg = GPG(binary=self._gpgbinary,
115                         homedir=tempfile.mkdtemp())
116         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
117
118         # import keys into the keyring:
119         # concatenating ascii-armored keys, which is correctly
120         # understood by GPG.
121
122         self._gpg.import_keys("".join(
123             [x.key_data for x in publkeys + privkeys]))
124
125         # assert the number of keys in the keyring
126         leap_assert(
127             len(listkeys()) == len(publkeys) + len(privkeys),
128             'Wrong number of public keys in keyring: %d, should be %d)' %
129             (len(listkeys()), len(publkeys) + len(privkeys)))
130         leap_assert(
131             len(listsecretkeys()) == len(privkeys),
132             'Wrong number of private keys in keyring: %d, should be %d)' %
133             (len(listsecretkeys()), len(privkeys)))
134
135     def _destroy_keyring(self):
136         """
137         Securely erase the keyring.
138         """
139         # TODO: implement some kind of wiping of data or a more
140         # secure way that
141         # does not write to disk.
142
143         try:
144             for secret in [True, False]:
145                 for key in self._gpg.list_keys(secret=secret):
146                     self._gpg.delete_keys(
147                         key['fingerprint'],
148                         secret=secret)
149             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
150
151         except:
152             raise
153
154         finally:
155             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
156                         "watch out! Tried to remove default gnupg home!")
157             shutil.rmtree(self._gpg.homedir)
158
159
160 def _build_key_from_gpg(address, key, key_data):
161     """
162     Build an OpenPGPKey for C{address} based on C{key} from
163     local gpg storage.
164
165     ASCII armored GPG key data has to be queried independently in this
166     wrapper, so we receive it in C{key_data}.
167
168     :param address: The address bound to the key.
169     :type address: str
170     :param key: Key obtained from GPG storage.
171     :type key: dict
172     :param key_data: Key data obtained from GPG storage.
173     :type key_data: str
174     :return: An instance of the key.
175     :rtype: OpenPGPKey
176     """
177     return OpenPGPKey(
178         address,
179         key_id=key['keyid'],
180         fingerprint=key['fingerprint'],
181         key_data=key_data,
182         private=True if key['type'] == 'sec' else False,
183         length=key['length'],
184         expiry_date=key['expires'],
185         validation=None,  # TODO: verify for validation.
186     )
187
188
189 #
190 # The OpenPGP wrapper
191 #
192
193 class OpenPGPKey(EncryptionKey):
194     """
195     Base class for OpenPGP keys.
196     """
197
198
199 class OpenPGPScheme(EncryptionScheme):
200     """
201     A wrapper for OpenPGP keys management and use (encryption, decyption,
202     signing and verification).
203     """
204
205     def __init__(self, soledad, gpgbinary=None):
206         """
207         Initialize the OpenPGP wrapper.
208
209         :param soledad: A Soledad instance for key storage.
210         :type soledad: leap.soledad.Soledad
211         :param gpgbinary: Name for GnuPG binary executable.
212         :type gpgbinary: C{str}
213         """
214         EncryptionScheme.__init__(self, soledad)
215         self._gpgbinary = gpgbinary
216
217     #
218     # Keys management
219     #
220
221     def gen_key(self, address):
222         """
223         Generate an OpenPGP keypair bound to C{address}.
224
225         :param address: The address bound to the key.
226         :type address: str
227         :return: The key bound to C{address}.
228         :rtype: OpenPGPKey
229         @raise KeyAlreadyExists: If key already exists in local database.
230         """
231         # make sure the key does not already exist
232         leap_assert(is_address(address), 'Not an user address: %s' % address)
233         try:
234             self.get_key(address)
235             raise errors.KeyAlreadyExists(address)
236         except errors.KeyNotFound:
237             logger.debug('Key for %s not found' % (address,))
238
239         with self._temporary_gpgwrapper() as gpg:
240             # TODO: inspect result, or use decorator
241             params = gpg.gen_key_input(
242                 key_type='RSA',
243                 key_length=4096,
244                 name_real=address,
245                 name_email=address,
246                 name_comment='Generated by LEAP Key Manager.')
247             logger.info("About to generate keys... This might take SOME time.")
248             gpg.gen_key(params)
249             logger.info("Keys for %s have been successfully "
250                         "generated." % (address,))
251             pubkeys = gpg.list_keys()
252
253             # assert for new key characteristics
254
255             # XXX This exception is not properly catched by the soledad
256             # bootstrapping, so if we do not finish generating the keys
257             # we end with a blocked thread -- kali
258
259             leap_assert(
260                 len(pubkeys) is 1,  # a unitary keyring!
261                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
262             key = gpg.list_keys(secret=True).pop()
263             leap_assert(
264                 len(key['uids']) is 1,  # with just one uid!
265                 'Wrong number of uids for key: %d.' % len(key['uids']))
266             leap_assert(
267                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
268                 'Key not correctly bound to address.')
269             # insert both public and private keys in storage
270             for secret in [True, False]:
271                 key = gpg.list_keys(secret=secret).pop()
272                 openpgp_key = _build_key_from_gpg(
273                     address, key,
274                     gpg.export_keys(key['fingerprint'], secret=secret))
275                 self.put_key(openpgp_key)
276
277         return self.get_key(address, private=True)
278
279     def get_key(self, address, private=False):
280         """
281         Get key bound to C{address} from local storage.
282
283         :param address: The address bound to the key.
284         :type address: str
285         :param private: Look for a private key instead of a public one?
286         :type private: bool
287
288         :return: The key bound to C{address}.
289         :rtype: OpenPGPKey
290         @raise KeyNotFound: If the key was not found on local storage.
291         """
292         leap_assert(is_address(address), 'Not an user address: %s' % address)
293         doc = self._get_key_doc(address, private)
294         if doc is None:
295             raise errors.KeyNotFound(address)
296         return build_key_from_dict(OpenPGPKey, address, doc.content)
297
298     def put_ascii_key(self, key_data):
299         """
300         Put key contained in ascii-armored C{key_data} in local storage.
301
302         :param key_data: The key data to be stored.
303         :type key_data: str or unicode
304         """
305         leap_assert_type(key_data, (str, unicode))
306         # TODO: add more checks for correct key data.
307         leap_assert(key_data is not None, 'Data does not represent a key.')
308
309         with self._temporary_gpgwrapper() as gpg:
310             # TODO: inspect result, or use decorator
311             gpg.import_keys(key_data)
312             privkey = None
313             pubkey = None
314
315             try:
316                 privkey = gpg.list_keys(secret=True).pop()
317             except IndexError:
318                 pass
319             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
320             # extract adress from first uid on key
321             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
322             leap_assert(match is not None, 'No user address in key data.')
323             address = match.group(1)
324             if privkey is not None:
325                 match = re.match(
326                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
327                 leap_assert(match is not None, 'No user address in key data.')
328                 privaddress = match.group(1)
329                 leap_assert(
330                     address == privaddress,
331                     'Addresses in pub and priv key differ.')
332                 leap_assert(
333                     pubkey['fingerprint'] == privkey['fingerprint'],
334                     'Fingerprints for pub and priv key differ.')
335                 # insert private key in storage
336                 openpgp_privkey = _build_key_from_gpg(
337                     address, privkey,
338                     gpg.export_keys(privkey['fingerprint'], secret=True))
339                 self.put_key(openpgp_privkey)
340             # insert public key in storage
341             openpgp_pubkey = _build_key_from_gpg(
342                 address, pubkey,
343                 gpg.export_keys(pubkey['fingerprint'], secret=False))
344             self.put_key(openpgp_pubkey)
345
346     def put_key(self, key):
347         """
348         Put C{key} in local storage.
349
350         :param key: The key to be stored.
351         :type key: OpenPGPKey
352         """
353         doc = self._get_key_doc(key.address, private=key.private)
354         if doc is None:
355             self._soledad.create_doc_from_json(key.get_json())
356         else:
357             doc.set_json(key.get_json())
358             self._soledad.put_doc(doc)
359
360     def _get_key_doc(self, address, private=False):
361         """
362         Get the document with a key (public, by default) bound to C{address}.
363
364         If C{private} is True, looks for a private key instead of a public.
365
366         :param address: The address bound to the key.
367         :type address: str
368         :param private: Whether to look for a private key.
369         :type private: bool
370         :return: The document with the key or None if it does not exist.
371         :rtype: leap.soledad.document.SoledadDocument
372         """
373         doclist = self._soledad.get_from_index(
374             TAGS_ADDRESS_PRIVATE_INDEX,
375             KEYMANAGER_KEY_TAG,
376             address,
377             '1' if private else '0')
378         if len(doclist) is 0:
379             return None
380         leap_assert(
381             len(doclist) is 1,
382             'Found more than one %s key for address!' %
383             'private' if private else 'public')
384         return doclist.pop()
385
386     def delete_key(self, key):
387         """
388         Remove C{key} from storage.
389
390         :param key: The key to be removed.
391         :type key: EncryptionKey
392         """
393         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
394         stored_key = self.get_key(key.address, private=key.private)
395         if stored_key is None:
396             raise errors.KeyNotFound(key)
397         if stored_key.__dict__ != key.__dict__:
398             raise errors.KeyAttributesDiffer(key)
399         doc = self._get_key_doc(key.address, key.private)
400         self._soledad.delete_doc(doc)
401
402     #
403     # Data encryption, decryption, signing and verifying
404     #
405
406     def _temporary_gpgwrapper(self, keys=None):
407         """
408         Return a gpg wrapper that implements the context manager protocol and
409         contains C{keys}.
410
411         :param key_data: ASCII armored key data.
412         :type key_data: str
413         :param gpgbinary: Name for GnuPG binary executable.
414         :type gpgbinary: C{str}
415
416         :return: a TempGPGWrapper instance
417         :rtype: TempGPGWrapper
418         """
419         # TODO do here checks on key_data
420         return TempGPGWrapper(
421             keys=keys, gpgbinary=self._gpgbinary)
422
423     @staticmethod
424     def _assert_gpg_result_ok(result):
425         """
426         Check if GPG result is 'ok' and log stderr outputs.
427         :param result: The GPG results
428         :type result:
429         """
430         stderr = getattr(result, 'stderr', None)
431         if stderr:
432             logger.debug("%s" % (stderr,))
433         if getattr(result, 'ok', None) is not True:
434             raise errors.EncryptionDecryptionFailed(
435                 'Failed to encrypt/decrypt: %s' % stderr)
436
437     def encrypt(self, data, pubkey, passphrase=None, sign=None,
438                 cipher_algo='AES256'):
439         """
440         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
441
442         :param data: The data to be encrypted.
443         :type data: str
444         :param pubkey: The key used to encrypt.
445         :type pubkey: OpenPGPKey
446         :param sign: The key used for signing.
447         :type sign: OpenPGPKey
448         :param cipher_algo: The cipher algorithm to use.
449         :type cipher_algo: str
450
451         :return: The encrypted data.
452         :rtype: str
453         """
454         leap_assert_type(pubkey, OpenPGPKey)
455         leap_assert(pubkey.private is False, 'Key is not public.')
456         keys = [pubkey]
457         if sign is not None:
458             leap_assert_type(sign, OpenPGPKey)
459             leap_assert(sign.private is True)
460             keys.append(sign)
461         with self._temporary_gpgwrapper(keys) as gpg:
462             result = gpg.encrypt(
463                 data, pubkey.fingerprint,
464                 default_key=sign.key_id if sign else None,
465                 passphrase=passphrase, symmetric=False,
466                 cipher_algo=cipher_algo)
467             # Here we cannot assert for correctness of sig because the sig is
468             # in the ciphertext.
469             # result.ok    - (bool) indicates if the operation succeeded
470             # result.data  - (bool) contains the result of the operation
471             self._assert_gpg_result_ok(result)
472             return result.data
473
474     def decrypt(self, data, privkey, passphrase=None, verify=None):
475         """
476         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
477
478         :param data: The data to be decrypted.
479         :type data: str
480         :param privkey: The key used to decrypt.
481         :type privkey: OpenPGPKey
482         :param verify: The key used to verify a signature.
483         :type verify: OpenPGPKey
484
485         :return: The decrypted data.
486         :rtype: unicode
487
488         @raise InvalidSignature: Raised if unable to verify the signature with
489             C{verify} key.
490         """
491         leap_assert(privkey.private is True, 'Key is not private.')
492         keys = [privkey]
493         if verify is not None:
494             leap_assert_type(verify, OpenPGPKey)
495             leap_assert(verify.private is False)
496             keys.append(verify)
497         with self._temporary_gpgwrapper(keys) as gpg:
498             result = gpg.decrypt(
499                 data, passphrase=passphrase, always_trust=True)
500             self._assert_gpg_result_ok(result)
501             # verify signature
502             if (verify is not None):
503                 if result.valid is False or \
504                         verify.fingerprint != result.pubkey_fingerprint:
505                     raise errors.InvalidSignature(
506                         'Failed to verify signature with key %s: %s' %
507                         (verify.key_id, stderr))
508
509             # XXX: this is the encoding used by gpg module
510             # https://github.com/isislovecruft/python-gnupg/\
511             #   blob/master/gnupg/_meta.py#L121
512             encoding = locale.getpreferredencoding()
513             return result.data.decode(encoding, 'replace')
514
515     def is_encrypted(self, data):
516         """
517         Return whether C{data} was asymmetrically encrypted using OpenPGP.
518
519         :param data: The data we want to know about.
520         :type data: str
521
522         :return: Whether C{data} was encrypted using this wrapper.
523         :rtype: bool
524         """
525         with self._temporary_gpgwrapper() as gpg:
526             gpgutil = GPGUtilities(gpg)
527             return gpgutil.is_encrypted_asym(data)
528
529     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
530              detach=True, binary=False):
531         """
532         Sign C{data} with C{privkey}.
533
534         :param data: The data to be signed.
535         :type data: str
536
537         :param privkey: The private key to be used to sign.
538         :type privkey: OpenPGPKey
539         :param digest_algo: The hash digest to use.
540         :type digest_algo: str
541         :param clearsign: If True, create a cleartext signature.
542         :type clearsign: bool
543         :param detach: If True, create a detached signature.
544         :type detach: bool
545         :param binary: If True, do not ascii armour the output.
546         :type binary: bool
547
548         :return: The ascii-armored signed data.
549         :rtype: str
550         """
551         leap_assert_type(privkey, OpenPGPKey)
552         leap_assert(privkey.private is True)
553
554         # result.fingerprint - contains the fingerprint of the key used to
555         #                      sign.
556         with self._temporary_gpgwrapper(privkey) as gpg:
557             result = gpg.sign(data, default_key=privkey.key_id,
558                               digest_algo=digest_algo, clearsign=clearsign,
559                               detach=detach, binary=binary)
560             rfprint = privkey.fingerprint
561             privkey = gpg.list_keys(secret=True).pop()
562             kfprint = privkey['fingerprint']
563             if result.fingerprint is None:
564                 raise errors.SignFailed(
565                     'Failed to sign with key %s: %s' %
566                     (privkey['keyid'], stderr))
567             leap_assert(
568                 result.fingerprint == kfprint,
569                 'Signature and private key fingerprints mismatch: '
570                 '%s != %s' % (rfprint, kfprint))
571         return result.data
572
573     def verify(self, data, pubkey):
574         """
575         Verify signed C{data} with C{pubkey}.
576
577         :param data: The data to be verified.
578         :type data: str
579
580         :param pubkey: The public key to be used on verification.
581         :type pubkey: OpenPGPKey
582
583         :return: The ascii-armored signed data.
584         :rtype: str
585         """
586         leap_assert_type(pubkey, OpenPGPKey)
587         leap_assert(pubkey.private is False)
588         with self._temporary_gpgwrapper(pubkey) as gpg:
589             result = gpg.verify(data)
590             gpgpubkey = gpg.list_keys().pop()
591             valid = result.valid
592             rfprint = result.fingerprint
593             kfprint = gpgpubkey['fingerprint']
594             # raise in case sig is invalid
595             if valid is False or rfprint != kfprint:
596                 raise errors.InvalidSignature(
597                     'Failed to verify signature '
598                     'with key %s.' % gpgpubkey['keyid'])
599             return True