64123318ae072efe3a93897c88f4cc48c6307076
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import logging
25 import os
26 import re
27 import shutil
28 import tempfile
29
30 from gnupg import GPG
31 from gnupg.gnupg import GPGUtilities
32
33 from leap.common.check import leap_assert, leap_assert_type
34 from leap.keymanager import errors
35 from leap.keymanager.keys import (
36     EncryptionKey,
37     EncryptionScheme,
38     is_address,
39     build_key_from_dict,
40     KEYMANAGER_KEY_TAG,
41     TAGS_ADDRESS_PRIVATE_INDEX,
42 )
43
44
45 logger = logging.getLogger(__name__)
46
47
48 class TempGPGWrapper(object):
49     """
50     A context manager that wraps a temporary GPG keyring which only contains
51     the keys given at object creation.
52     """
53
54     def __init__(self, keys=None, gpgbinary=None):
55         """
56         Create an empty temporary keyring and import any given C{keys} into
57         it.
58
59         :param keys: OpenPGP key, or list of.
60         :type keys: OpenPGPKey or list of OpenPGPKeys
61         :param gpgbinary: Name for GnuPG binary executable.
62         :type gpgbinary: C{str}
63         """
64         self._gpg = None
65         self._gpgbinary = gpgbinary
66         if not keys:
67             keys = list()
68         if not isinstance(keys, list):
69             keys = [keys]
70         self._keys = keys
71         for key in keys:
72             leap_assert_type(key, OpenPGPKey)
73
74     def __enter__(self):
75         """
76         Build and return a GPG keyring containing the keys given on
77         object creation.
78
79         :return: A GPG instance containing the keys given on object creation.
80         :rtype: gnupg.GPG
81         """
82         self._build_keyring()
83         return self._gpg
84
85     def __exit__(self, exc_type, exc_value, traceback):
86         """
87         Ensure the gpg is properly destroyed.
88         """
89         # TODO handle exceptions and log here
90         self._destroy_keyring()
91
92     def _build_keyring(self):
93         """
94         Create a GPG keyring containing the keys given on object creation.
95
96         :return: A GPG instance containing the keys given on object creation.
97         :rtype: gnupg.GPG
98         """
99         privkeys = [key for key in self._keys if key and key.private is True]
100         publkeys = [key for key in self._keys if key and key.private is False]
101         # here we filter out public keys that have a correspondent
102         # private key in the list because the private key_data by
103         # itself is enough to also have the public key in the keyring,
104         # and we want to count the keys afterwards.
105
106         privaddrs = map(lambda privkey: privkey.address, privkeys)
107         publkeys = filter(
108             lambda pubkey: pubkey.address not in privaddrs, publkeys)
109
110         listkeys = lambda: self._gpg.list_keys()
111         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
112
113         self._gpg = GPG(binary=self._gpgbinary,
114                         homedir=tempfile.mkdtemp())
115         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
116
117         # import keys into the keyring:
118         # concatenating ascii-armored keys, which is correctly
119         # understood by GPG.
120
121         self._gpg.import_keys("".join(
122             [x.key_data for x in publkeys + privkeys]))
123
124         # assert the number of keys in the keyring
125         leap_assert(
126             len(listkeys()) == len(publkeys) + len(privkeys),
127             'Wrong number of public keys in keyring: %d, should be %d)' %
128             (len(listkeys()), len(publkeys) + len(privkeys)))
129         leap_assert(
130             len(listsecretkeys()) == len(privkeys),
131             'Wrong number of private keys in keyring: %d, should be %d)' %
132             (len(listsecretkeys()), len(privkeys)))
133
134     def _destroy_keyring(self):
135         """
136         Securely erase the keyring.
137         """
138         # TODO: implement some kind of wiping of data or a more
139         # secure way that
140         # does not write to disk.
141
142         try:
143             for secret in [True, False]:
144                 for key in self._gpg.list_keys(secret=secret):
145                     self._gpg.delete_keys(
146                         key['fingerprint'],
147                         secret=secret)
148             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
149
150         except:
151             raise
152
153         finally:
154             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
155                         "watch out! Tried to remove default gnupg home!")
156             shutil.rmtree(self._gpg.homedir)
157
158
159 def _build_key_from_gpg(address, key, key_data):
160     """
161     Build an OpenPGPKey for C{address} based on C{key} from
162     local gpg storage.
163
164     ASCII armored GPG key data has to be queried independently in this
165     wrapper, so we receive it in C{key_data}.
166
167     :param address: The address bound to the key.
168     :type address: str
169     :param key: Key obtained from GPG storage.
170     :type key: dict
171     :param key_data: Key data obtained from GPG storage.
172     :type key_data: str
173     :return: An instance of the key.
174     :rtype: OpenPGPKey
175     """
176     return OpenPGPKey(
177         address,
178         key_id=key['keyid'],
179         fingerprint=key['fingerprint'],
180         key_data=key_data,
181         private=True if key['type'] == 'sec' else False,
182         length=key['length'],
183         expiry_date=key['expires'],
184         validation=None,  # TODO: verify for validation.
185     )
186
187
188 #
189 # The OpenPGP wrapper
190 #
191
192 class OpenPGPKey(EncryptionKey):
193     """
194     Base class for OpenPGP keys.
195     """
196
197
198 class OpenPGPScheme(EncryptionScheme):
199     """
200     A wrapper for OpenPGP keys management and use (encryption, decyption,
201     signing and verification).
202     """
203
204     def __init__(self, soledad, gpgbinary=None):
205         """
206         Initialize the OpenPGP wrapper.
207
208         :param soledad: A Soledad instance for key storage.
209         :type soledad: leap.soledad.Soledad
210         :param gpgbinary: Name for GnuPG binary executable.
211         :type gpgbinary: C{str}
212         """
213         EncryptionScheme.__init__(self, soledad)
214         self._gpgbinary = gpgbinary
215
216     #
217     # Keys management
218     #
219
220     def gen_key(self, address):
221         """
222         Generate an OpenPGP keypair bound to C{address}.
223
224         :param address: The address bound to the key.
225         :type address: str
226         :return: The key bound to C{address}.
227         :rtype: OpenPGPKey
228         @raise KeyAlreadyExists: If key already exists in local database.
229         """
230         # make sure the key does not already exist
231         leap_assert(is_address(address), 'Not an user address: %s' % address)
232         try:
233             self.get_key(address)
234             raise errors.KeyAlreadyExists(address)
235         except errors.KeyNotFound:
236             logger.debug('Key for %s not found' % (address,))
237
238         with self._temporary_gpgwrapper() as gpg:
239             # TODO: inspect result, or use decorator
240             params = gpg.gen_key_input(
241                 key_type='RSA',
242                 key_length=4096,
243                 name_real=address,
244                 name_email=address,
245                 name_comment='Generated by LEAP Key Manager.')
246             logger.info("About to generate keys... This might take SOME time.")
247             gpg.gen_key(params)
248             logger.info("Keys for %s have been successfully "
249                         "generated." % (address,))
250             pubkeys = gpg.list_keys()
251
252             # assert for new key characteristics
253
254             # XXX This exception is not properly catched by the soledad
255             # bootstrapping, so if we do not finish generating the keys
256             # we end with a blocked thread -- kali
257
258             leap_assert(
259                 len(pubkeys) is 1,  # a unitary keyring!
260                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
261             key = gpg.list_keys(secret=True).pop()
262             leap_assert(
263                 len(key['uids']) is 1,  # with just one uid!
264                 'Wrong number of uids for key: %d.' % len(key['uids']))
265             leap_assert(
266                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
267                 'Key not correctly bound to address.')
268             # insert both public and private keys in storage
269             for secret in [True, False]:
270                 key = gpg.list_keys(secret=secret).pop()
271                 openpgp_key = _build_key_from_gpg(
272                     address, key,
273                     gpg.export_keys(key['fingerprint'], secret=secret))
274                 self.put_key(openpgp_key)
275
276         return self.get_key(address, private=True)
277
278     def get_key(self, address, private=False):
279         """
280         Get key bound to C{address} from local storage.
281
282         :param address: The address bound to the key.
283         :type address: str
284         :param private: Look for a private key instead of a public one?
285         :type private: bool
286
287         :return: The key bound to C{address}.
288         :rtype: OpenPGPKey
289         @raise KeyNotFound: If the key was not found on local storage.
290         """
291         leap_assert(is_address(address), 'Not an user address: %s' % address)
292         doc = self._get_key_doc(address, private)
293         if doc is None:
294             raise errors.KeyNotFound(address)
295         return build_key_from_dict(OpenPGPKey, address, doc.content)
296
297     def put_ascii_key(self, key_data):
298         """
299         Put key contained in ascii-armored C{key_data} in local storage.
300
301         :param key_data: The key data to be stored.
302         :type key_data: str or unicode
303         """
304         leap_assert_type(key_data, (str, unicode))
305         # TODO: add more checks for correct key data.
306         leap_assert(key_data is not None, 'Data does not represent a key.')
307
308         with self._temporary_gpgwrapper() as gpg:
309             # TODO: inspect result, or use decorator
310             gpg.import_keys(key_data)
311             privkey = None
312             pubkey = None
313
314             try:
315                 privkey = gpg.list_keys(secret=True).pop()
316             except IndexError:
317                 pass
318             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
319             # extract adress from first uid on key
320             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
321             leap_assert(match is not None, 'No user address in key data.')
322             address = match.group(1)
323             if privkey is not None:
324                 match = re.match(
325                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
326                 leap_assert(match is not None, 'No user address in key data.')
327                 privaddress = match.group(1)
328                 leap_assert(
329                     address == privaddress,
330                     'Addresses in pub and priv key differ.')
331                 leap_assert(
332                     pubkey['fingerprint'] == privkey['fingerprint'],
333                     'Fingerprints for pub and priv key differ.')
334                 # insert private key in storage
335                 openpgp_privkey = _build_key_from_gpg(
336                     address, privkey,
337                     gpg.export_keys(privkey['fingerprint'], secret=True))
338                 self.put_key(openpgp_privkey)
339             # insert public key in storage
340             openpgp_pubkey = _build_key_from_gpg(
341                 address, pubkey,
342                 gpg.export_keys(pubkey['fingerprint'], secret=False))
343             self.put_key(openpgp_pubkey)
344
345     def put_key(self, key):
346         """
347         Put C{key} in local storage.
348
349         :param key: The key to be stored.
350         :type key: OpenPGPKey
351         """
352         doc = self._get_key_doc(key.address, private=key.private)
353         if doc is None:
354             self._soledad.create_doc_from_json(key.get_json())
355         else:
356             doc.set_json(key.get_json())
357             self._soledad.put_doc(doc)
358
359     def _get_key_doc(self, address, private=False):
360         """
361         Get the document with a key (public, by default) bound to C{address}.
362
363         If C{private} is True, looks for a private key instead of a public.
364
365         :param address: The address bound to the key.
366         :type address: str
367         :param private: Whether to look for a private key.
368         :type private: bool
369         :return: The document with the key or None if it does not exist.
370         :rtype: leap.soledad.document.SoledadDocument
371         """
372         doclist = self._soledad.get_from_index(
373             TAGS_ADDRESS_PRIVATE_INDEX,
374             KEYMANAGER_KEY_TAG,
375             address,
376             '1' if private else '0')
377         if len(doclist) is 0:
378             return None
379         leap_assert(
380             len(doclist) is 1,
381             'Found more than one %s key for address!' %
382             'private' if private else 'public')
383         return doclist.pop()
384
385     def delete_key(self, key):
386         """
387         Remove C{key} from storage.
388
389         :param key: The key to be removed.
390         :type key: EncryptionKey
391         """
392         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
393         stored_key = self.get_key(key.address, private=key.private)
394         if stored_key is None:
395             raise errors.KeyNotFound(key)
396         if stored_key.__dict__ != key.__dict__:
397             raise errors.KeyAttributesDiffer(key)
398         doc = self._get_key_doc(key.address, key.private)
399         self._soledad.delete_doc(doc)
400
401     #
402     # Data encryption, decryption, signing and verifying
403     #
404
405     def _temporary_gpgwrapper(self, keys=None):
406         """
407         Return a gpg wrapper that implements the context manager protocol and
408         contains C{keys}.
409
410         :param key_data: ASCII armored key data.
411         :type key_data: str
412         :param gpgbinary: Name for GnuPG binary executable.
413         :type gpgbinary: C{str}
414
415         :return: a TempGPGWrapper instance
416         :rtype: TempGPGWrapper
417         """
418         # TODO do here checks on key_data
419         return TempGPGWrapper(
420             keys=keys, gpgbinary=self._gpgbinary)
421
422     @staticmethod
423     def _assert_gpg_result_ok(result):
424         """
425         Check if GPG result is 'ok' and log stderr outputs.
426         :param result: The GPG results
427         :type result:
428         """
429         stderr = getattr(result, 'stderr', None)
430         if stderr:
431             logger.debug("%s" % (stderr,))
432         if getattr(result, 'ok', None) is not True:
433             raise errors.EncryptionDecryptionFailed(
434                 'Failed to encrypt/decrypt: %s' % stderr)
435
436     def encrypt(self, data, pubkey, passphrase=None, sign=None,
437                 cipher_algo='AES256'):
438         """
439         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
440
441         :param data: The data to be encrypted.
442         :type data: str
443         :param pubkey: The key used to encrypt.
444         :type pubkey: OpenPGPKey
445         :param sign: The key used for signing.
446         :type sign: OpenPGPKey
447         :param cipher_algo: The cipher algorithm to use.
448         :type cipher_algo: str
449
450         :return: The encrypted data.
451         :rtype: str
452         """
453         leap_assert_type(pubkey, OpenPGPKey)
454         leap_assert(pubkey.private is False, 'Key is not public.')
455         keys = [pubkey]
456         if sign is not None:
457             leap_assert_type(sign, OpenPGPKey)
458             leap_assert(sign.private is True)
459             keys.append(sign)
460         with self._temporary_gpgwrapper(keys) as gpg:
461             result = gpg.encrypt(
462                 data, pubkey.fingerprint,
463                 default_key=sign.key_id if sign else None,
464                 passphrase=passphrase, symmetric=False,
465                 cipher_algo=cipher_algo)
466             # Here we cannot assert for correctness of sig because the sig is
467             # in the ciphertext.
468             # result.ok    - (bool) indicates if the operation succeeded
469             # result.data  - (bool) contains the result of the operation
470             self._assert_gpg_result_ok(result)
471             return result.data
472
473     def decrypt(self, data, privkey, passphrase=None, verify=None):
474         """
475         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
476
477         :param data: The data to be decrypted.
478         :type data: str
479         :param privkey: The key used to decrypt.
480         :type privkey: OpenPGPKey
481         :param verify: The key used to verify a signature.
482         :type verify: OpenPGPKey
483
484         :return: The decrypted data.
485         :rtype: str
486
487         @raise InvalidSignature: Raised if unable to verify the signature with
488             C{verify} key.
489         """
490         leap_assert(privkey.private is True, 'Key is not private.')
491         keys = [privkey]
492         if verify is not None:
493             leap_assert_type(verify, OpenPGPKey)
494             leap_assert(verify.private is False)
495             keys.append(verify)
496         with self._temporary_gpgwrapper(keys) as gpg:
497             result = gpg.decrypt(
498                 data, passphrase=passphrase, always_trust=True)
499             self._assert_gpg_result_ok(result)
500             # verify signature
501             if (verify is not None):
502                 if result.valid is False or \
503                         verify.fingerprint != result.pubkey_fingerprint:
504                     raise errors.InvalidSignature(
505                         'Failed to verify signature with key %s: %s' %
506                         (verify.key_id, stderr))
507             return result.data
508
509     def is_encrypted(self, data):
510         """
511         Return whether C{data} was asymmetrically encrypted using OpenPGP.
512
513         :param data: The data we want to know about.
514         :type data: str
515
516         :return: Whether C{data} was encrypted using this wrapper.
517         :rtype: bool
518         """
519         with self._temporary_gpgwrapper() as gpg:
520             gpgutil = GPGUtilities(gpg)
521             return gpgutil.is_encrypted_asym(data)
522
523     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
524              detach=True, binary=False):
525         """
526         Sign C{data} with C{privkey}.
527
528         :param data: The data to be signed.
529         :type data: str
530
531         :param privkey: The private key to be used to sign.
532         :type privkey: OpenPGPKey
533         :param digest_algo: The hash digest to use.
534         :type digest_algo: str
535         :param clearsign: If True, create a cleartext signature.
536         :type clearsign: bool
537         :param detach: If True, create a detached signature.
538         :type detach: bool
539         :param binary: If True, do not ascii armour the output.
540         :type binary: bool
541
542         :return: The ascii-armored signed data.
543         :rtype: str
544         """
545         leap_assert_type(privkey, OpenPGPKey)
546         leap_assert(privkey.private is True)
547
548         # result.fingerprint - contains the fingerprint of the key used to
549         #                      sign.
550         with self._temporary_gpgwrapper(privkey) as gpg:
551             result = gpg.sign(data, default_key=privkey.key_id,
552                               digest_algo=digest_algo, clearsign=clearsign,
553                               detach=detach, binary=binary)
554             rfprint = privkey.fingerprint
555             privkey = gpg.list_keys(secret=True).pop()
556             kfprint = privkey['fingerprint']
557             if result.fingerprint is None:
558                 raise errors.SignFailed(
559                     'Failed to sign with key %s: %s' %
560                     (privkey['keyid'], stderr))
561             leap_assert(
562                 result.fingerprint == kfprint,
563                 'Signature and private key fingerprints mismatch: '
564                 '%s != %s' % (rfprint, kfprint))
565         return result.data
566
567     def verify(self, data, pubkey):
568         """
569         Verify signed C{data} with C{pubkey}.
570
571         :param data: The data to be verified.
572         :type data: str
573
574         :param pubkey: The public key to be used on verification.
575         :type pubkey: OpenPGPKey
576
577         :return: The ascii-armored signed data.
578         :rtype: str
579         """
580         leap_assert_type(pubkey, OpenPGPKey)
581         leap_assert(pubkey.private is False)
582         with self._temporary_gpgwrapper(pubkey) as gpg:
583             result = gpg.verify(data)
584             gpgpubkey = gpg.list_keys().pop()
585             valid = result.valid
586             rfprint = result.fingerprint
587             kfprint = gpgpubkey['fingerprint']
588             # raise in case sig is invalid
589             if valid is False or rfprint != kfprint:
590                 raise errors.InvalidSignature(
591                     'Failed to verify signature '
592                     'with key %s.' % gpgpubkey['keyid'])
593             return True