Support bundled GPG and change API.
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import logging
25 import os
26 import re
27 import shutil
28 import tempfile
29
30
31 from leap.common.check import leap_assert, leap_assert_type
32 from leap.keymanager import errors
33 from leap.keymanager.keys import (
34     EncryptionKey,
35     EncryptionScheme,
36     is_address,
37     build_key_from_dict,
38     KEYMANAGER_KEY_TAG,
39     TAGS_ADDRESS_PRIVATE_INDEX,
40 )
41 from leap.keymanager.gpg import GPGWrapper
42
43
44 logger = logging.getLogger(__name__)
45
46
47 class TempGPGWrapper(object):
48     """
49     A context manager returning a temporary GPG wrapper keyring, which
50     contains exactly zero or one pubkeys, and zero or one privkeys.
51
52     Temporary unitary keyrings allow the to use GPG's facilities for exactly
53     one key. This function creates an empty temporary keyring and imports
54     C{keys} if it is not None.
55     """
56     def __init__(self, keys=None, gpgbinary=None):
57         """
58         :param keys: OpenPGP key, or list of.
59         :type keys: OpenPGPKey or list of OpenPGPKeys
60         :param gpgbinary: Name for GnuPG binary executable.
61         :type gpgbinary: C{str}
62         """
63         self._gpg = None
64         self._gpgbinary = gpgbinary
65         if not keys:
66             keys = list()
67         if not isinstance(keys, list):
68             keys = [keys]
69         self._keys = keys
70         for key in filter(None, keys):
71             leap_assert_type(key, OpenPGPKey)
72
73     def __enter__(self):
74         """
75         Calls the unitary gpgwrapper initializer
76
77         :return: A GPG wrapper with a unitary keyring.
78         :rtype: gnupg.GPG
79         """
80         self._build_keyring()
81         return self._gpg
82
83     def __exit__(self, exc_type, exc_value, traceback):
84         """
85         Ensures the gpgwrapper is properly destroyed.
86         """
87         # TODO handle exceptions and log here
88         self._destroy_keyring()
89
90     def _build_keyring(self):
91         """
92         Create an empty GPG keyring and import C{keys} into it.
93
94         :param keys: List of keys to add to the keyring.
95         :type keys: list of OpenPGPKey
96
97         :return: A GPG wrapper with a unitary keyring.
98         :rtype: gnupg.GPG
99         """
100         privkeys = [key for key in self._keys if key and key.private is True]
101         publkeys = [key for key in self._keys if key and key.private is False]
102         # here we filter out public keys that have a correspondent
103         # private key in the list because the private key_data by
104         # itself is enough to also have the public key in the keyring,
105         # and we want to count the keys afterwards.
106
107         privaddrs = map(lambda privkey: privkey.address, privkeys)
108         publkeys = filter(
109             lambda pubkey: pubkey.address not in privaddrs, publkeys)
110
111         listkeys = lambda: self._gpg.list_keys()
112         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
113
114         self._gpg = GPGWrapper(
115             gnupghome=tempfile.mkdtemp(),
116             gpgbinary=self._gpgbinary)
117         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
118
119         # import keys into the keyring:
120         # concatenating ascii-armored keys, which is correctly
121         # understood by the GPGWrapper.
122
123         self._gpg.import_keys("".join(
124             [x.key_data for x in publkeys + privkeys]))
125
126         # assert the number of keys in the keyring
127         leap_assert(
128             len(listkeys()) == len(publkeys) + len(privkeys),
129             'Wrong number of public keys in keyring: %d, should be %d)' %
130             (len(listkeys()), len(publkeys) + len(privkeys)))
131         leap_assert(
132             len(listsecretkeys()) == len(privkeys),
133             'Wrong number of private keys in keyring: %d, should be %d)' %
134             (len(listsecretkeys()), len(privkeys)))
135
136     def _destroy_keyring(self):
137         """
138         Securely erase a unitary keyring.
139         """
140         # TODO: implement some kind of wiping of data or a more
141         # secure way that
142         # does not write to disk.
143
144         try:
145             for secret in [True, False]:
146                 for key in self._gpg.list_keys(secret=secret):
147                     self._gpg.delete_keys(
148                         key['fingerprint'],
149                         secret=secret)
150             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
151
152         except:
153             raise
154
155         finally:
156             leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
157                         "watch out! Tried to remove default gnupg home!")
158             shutil.rmtree(self._gpg.gnupghome)
159
160
161 def _build_key_from_gpg(address, key, key_data):
162     """
163     Build an OpenPGPKey for C{address} based on C{key} from
164     local gpg storage.
165
166     ASCII armored GPG key data has to be queried independently in this
167     wrapper, so we receive it in C{key_data}.
168
169     :param address: The address bound to the key.
170     :type address: str
171     :param key: Key obtained from GPG storage.
172     :type key: dict
173     :param key_data: Key data obtained from GPG storage.
174     :type key_data: str
175     :return: An instance of the key.
176     :rtype: OpenPGPKey
177     """
178     return OpenPGPKey(
179         address,
180         key_id=key['keyid'],
181         fingerprint=key['fingerprint'],
182         key_data=key_data,
183         private=True if key['type'] == 'sec' else False,
184         length=key['length'],
185         expiry_date=key['expires'],
186         validation=None,  # TODO: verify for validation.
187     )
188
189
190 #
191 # The OpenPGP wrapper
192 #
193
194 class OpenPGPKey(EncryptionKey):
195     """
196     Base class for OpenPGP keys.
197     """
198
199
200 class OpenPGPScheme(EncryptionScheme):
201     """
202     A wrapper for OpenPGP keys management and use (encryption, decyption,
203     signing and verification).
204     """
205
206     def __init__(self, soledad, gpgbinary=None):
207         """
208         Initialize the OpenPGP wrapper.
209
210         :param soledad: A Soledad instance for key storage.
211         :type soledad: leap.soledad.Soledad
212         :param gpgbinary: Name for GnuPG binary executable.
213         :type gpgbinary: C{str}
214         """
215         EncryptionScheme.__init__(self, soledad)
216         self._gpgbinary = gpgbinary
217
218     #
219     # Keys management
220     #
221
222     def gen_key(self, address):
223         """
224         Generate an OpenPGP keypair bound to C{address}.
225
226         :param address: The address bound to the key.
227         :type address: str
228         :return: The key bound to C{address}.
229         :rtype: OpenPGPKey
230         @raise KeyAlreadyExists: If key already exists in local database.
231         """
232         # make sure the key does not already exist
233         leap_assert(is_address(address), 'Not an user address: %s' % address)
234         try:
235             self.get_key(address)
236             raise errors.KeyAlreadyExists(address)
237         except errors.KeyNotFound:
238             logger.debug('Key for %s not found' % (address,))
239
240         with self._temporary_gpgwrapper() as gpg:
241             # TODO: inspect result, or use decorator
242             params = gpg.gen_key_input(
243                 key_type='RSA',
244                 key_length=4096,
245                 name_real=address,
246                 name_email=address,
247                 name_comment='Generated by LEAP Key Manager.')
248             logger.info("About to generate keys... This might take SOME time.")
249             gpg.gen_key(params)
250             logger.info("Keys for %s have been successfully "
251                         "generated." % (address,))
252             pubkeys = gpg.list_keys()
253
254             # assert for new key characteristics
255
256             # XXX This exception is not properly catched by the soledad
257             # bootstrapping, so if we do not finish generating the keys
258             # we end with a blocked thread -- kali
259
260             leap_assert(
261                 len(pubkeys) is 1,  # a unitary keyring!
262                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
263             key = gpg.list_keys(secret=True).pop()
264             leap_assert(
265                 len(key['uids']) is 1,  # with just one uid!
266                 'Wrong number of uids for key: %d.' % len(key['uids']))
267             leap_assert(
268                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
269                 'Key not correctly bound to address.')
270             # insert both public and private keys in storage
271             for secret in [True, False]:
272                 key = gpg.list_keys(secret=secret).pop()
273                 openpgp_key = _build_key_from_gpg(
274                     address, key,
275                     gpg.export_keys(key['fingerprint'], secret=secret))
276                 self.put_key(openpgp_key)
277
278         return self.get_key(address, private=True)
279
280     def get_key(self, address, private=False):
281         """
282         Get key bound to C{address} from local storage.
283
284         :param address: The address bound to the key.
285         :type address: str
286         :param private: Look for a private key instead of a public one?
287         :type private: bool
288
289         :return: The key bound to C{address}.
290         :rtype: OpenPGPKey
291         @raise KeyNotFound: If the key was not found on local storage.
292         """
293         leap_assert(is_address(address), 'Not an user address: %s' % address)
294         doc = self._get_key_doc(address, private)
295         if doc is None:
296             raise errors.KeyNotFound(address)
297         return build_key_from_dict(OpenPGPKey, address, doc.content)
298
299     def put_ascii_key(self, key_data):
300         """
301         Put key contained in ascii-armored C{key_data} in local storage.
302
303         :param key_data: The key data to be stored.
304         :type key_data: str
305         """
306         leap_assert_type(key_data, str)
307         # TODO: add more checks for correct key data.
308         leap_assert(key_data is not None, 'Data does not represent a key.')
309
310         with self._temporary_gpgwrapper() as gpg:
311             # TODO: inspect result, or use decorator
312             gpg.import_keys(key_data)
313             privkey = None
314             pubkey = None
315
316             try:
317                 privkey = gpg.list_keys(secret=True).pop()
318             except IndexError:
319                 pass
320             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
321             # extract adress from first uid on key
322             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
323             leap_assert(match is not None, 'No user address in key data.')
324             address = match.group(1)
325             if privkey is not None:
326                 match = re.match(
327                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
328                 leap_assert(match is not None, 'No user address in key data.')
329                 privaddress = match.group(1)
330                 leap_assert(
331                     address == privaddress,
332                     'Addresses in pub and priv key differ.')
333                 leap_assert(
334                     pubkey['fingerprint'] == privkey['fingerprint'],
335                     'Fingerprints for pub and priv key differ.')
336                 # insert private key in storage
337                 openpgp_privkey = _build_key_from_gpg(
338                     address, privkey,
339                     gpg.export_keys(privkey['fingerprint'], secret=True))
340                 self.put_key(openpgp_privkey)
341             # insert public key in storage
342             openpgp_pubkey = _build_key_from_gpg(
343                 address, pubkey,
344                 gpg.export_keys(pubkey['fingerprint'], secret=False))
345             self.put_key(openpgp_pubkey)
346
347     def put_key(self, key):
348         """
349         Put C{key} in local storage.
350
351         :param key: The key to be stored.
352         :type key: OpenPGPKey
353         """
354         doc = self._get_key_doc(key.address, private=key.private)
355         if doc is None:
356             self._soledad.create_doc_from_json(key.get_json())
357         else:
358             doc.set_json(key.get_json())
359             self._soledad.put_doc(doc)
360
361     def _get_key_doc(self, address, private=False):
362         """
363         Get the document with a key (public, by default) bound to C{address}.
364
365         If C{private} is True, looks for a private key instead of a public.
366
367         :param address: The address bound to the key.
368         :type address: str
369         :param private: Whether to look for a private key.
370         :type private: bool
371         :return: The document with the key or None if it does not exist.
372         :rtype: leap.soledad.document.SoledadDocument
373         """
374         doclist = self._soledad.get_from_index(
375             TAGS_ADDRESS_PRIVATE_INDEX,
376             KEYMANAGER_KEY_TAG,
377             address,
378             '1' if private else '0')
379         if len(doclist) is 0:
380             return None
381         leap_assert(
382             len(doclist) is 1,
383             'Found more than one %s key for address!' %
384             'private' if private else 'public')
385         return doclist.pop()
386
387     def delete_key(self, key):
388         """
389         Remove C{key} from storage.
390
391         :param key: The key to be removed.
392         :type key: EncryptionKey
393         """
394         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
395         stored_key = self.get_key(key.address, private=key.private)
396         if stored_key is None:
397             raise errors.KeyNotFound(key)
398         if stored_key.__dict__ != key.__dict__:
399             raise errors.KeyAttributesDiffer(key)
400         doc = self._get_key_doc(key.address, key.private)
401         self._soledad.delete_doc(doc)
402
403     #
404     # Data encryption, decryption, signing and verifying
405     #
406
407     def _temporary_gpgwrapper(self, keys=None):
408         """
409         Returns a unitary gpg wrapper that implements context manager
410         protocol.
411
412         :param key_data: ASCII armored key data.
413         :type key_data: str
414         :param gpgbinary: Name for GnuPG binary executable.
415         :type gpgbinary: C{str}
416
417         :return: a GPGWrapper instance
418         :rtype: GPGWrapper
419         """
420         # TODO do here checks on key_data
421         return TempGPGWrapper(
422             keys=keys, gpgbinary=self._gpgbinary)
423
424     @staticmethod
425     def _assert_gpg_result_ok(result):
426         """
427         Check if GPG result is 'ok' and log stderr outputs.
428         :param result: The GPG results
429         :type result:
430         """
431         stderr = getattr(result, 'stderr', None)
432         if stderr:
433             logger.debug("%s" % (stderr,))
434         if getattr(result, 'ok', None) is not True:
435             raise errors.EncryptionDecryptionFailed(
436                 'Failed to encrypt/decrypt in %s: %s' % (
437                     this.encrypt,
438                     stderr))
439
440     def encrypt(self, data, pubkey, passphrase=None, sign=None):
441         """
442         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
443
444         :param data: The data to be encrypted.
445         :type data: str
446         :param pubkey: The key used to encrypt.
447         :type pubkey: OpenPGPKey
448         :param sign: The key used for signing.
449         :type sign: OpenPGPKey
450
451         :return: The encrypted data.
452         :rtype: str
453         """
454         leap_assert_type(pubkey, OpenPGPKey)
455         leap_assert(pubkey.private is False, 'Key is not public.')
456         keys = [pubkey]
457         if sign is not None:
458             leap_assert_type(sign, OpenPGPKey)
459             leap_assert(sign.private is True)
460             keys.append(sign)
461         with self._temporary_gpgwrapper(keys) as gpg:
462             result = gpg.encrypt(
463                 data, pubkey.fingerprint,
464                 sign=sign.key_id if sign else None,
465                 passphrase=passphrase, symmetric=False)
466             # Here we cannot assert for correctness of sig because the sig is
467             # in the ciphertext.
468             # result.ok    - (bool) indicates if the operation succeeded
469             # result.data  - (bool) contains the result of the operation
470             self._assert_gpg_result_ok(result)
471             return result.data
472
473     def decrypt(self, data, privkey, passphrase=None, verify=None):
474         """
475         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
476
477         :param data: The data to be decrypted.
478         :type data: str
479         :param privkey: The key used to decrypt.
480         :type privkey: OpenPGPKey
481         :param verify: The key used to verify a signature.
482         :type verify: OpenPGPKey
483
484         :return: The decrypted data.
485         :rtype: str
486
487         @raise InvalidSignature: Raised if unable to verify the signature with
488             C{verify} key.
489         """
490         leap_assert(privkey.private is True, 'Key is not private.')
491         keys = [privkey]
492         if verify is not None:
493             leap_assert_type(verify, OpenPGPKey)
494             leap_assert(verify.private is False)
495             keys.append(verify)
496         with self._temporary_gpgwrapper(keys) as gpg:
497             result = gpg.decrypt(data, passphrase=passphrase)
498             self._assert_gpg_result_ok(result)
499             # verify signature
500             if (verify is not None):
501                 if result.valid is False or \
502                         verify.fingerprint != result.pubkey_fingerprint:
503                     raise errors.InvalidSignature(
504                         'Failed to verify signature with key %s: %s' %
505                         (verify.key_id, stderr))
506             return result.data
507
508     def is_encrypted(self, data):
509         """
510         Return whether C{data} was asymmetrically encrypted using OpenPGP.
511
512         :param data: The data we want to know about.
513         :type data: str
514
515         :return: Whether C{data} was encrypted using this wrapper.
516         :rtype: bool
517         """
518         with self._temporary_gpgwrapper() as gpg:
519             return gpg.is_encrypted_asym(data)
520
521     def sign(self, data, privkey):
522         """
523         Sign C{data} with C{privkey}.
524
525         :param data: The data to be signed.
526         :type data: str
527
528         :param privkey: The private key to be used to sign.
529         :type privkey: OpenPGPKey
530
531         :return: The ascii-armored signed data.
532         :rtype: str
533         """
534         leap_assert_type(privkey, OpenPGPKey)
535         leap_assert(privkey.private is True)
536
537         # result.fingerprint - contains the fingerprint of the key used to
538         #                      sign.
539         with self._temporary_gpgwrapper(privkey) as gpg:
540             result = gpg.sign(data, keyid=privkey.key_id)
541             rfprint = privkey.fingerprint
542             privkey = gpg.list_keys(secret=True).pop()
543             kfprint = privkey['fingerprint']
544             if result.fingerprint is None:
545                 raise errors.SignFailed(
546                     'Failed to sign with key %s: %s' %
547                     (privkey['keyid'], stderr))
548             leap_assert(
549                 result.fingerprint == kfprint,
550                 'Signature and private key fingerprints mismatch: '
551                 '%s != %s' % (rfprint, kfprint))
552         return result.data
553
554     def verify(self, data, pubkey):
555         """
556         Verify signed C{data} with C{pubkey}.
557
558         :param data: The data to be verified.
559         :type data: str
560
561         :param pubkey: The public key to be used on verification.
562         :type pubkey: OpenPGPKey
563
564         :return: The ascii-armored signed data.
565         :rtype: str
566         """
567         leap_assert_type(pubkey, OpenPGPKey)
568         leap_assert(pubkey.private is False)
569         with self._temporary_gpgwrapper(pubkey) as gpg:
570             result = gpg.verify(data)
571             gpgpubkey = gpg.list_keys().pop()
572             valid = result.valid
573             rfprint = result.fingerprint
574             kfprint = gpgpubkey['fingerprint']
575             # raise in case sig is invalid
576             if valid is False or rfprint != kfprint:
577                 raise errors.InvalidSignature(
578                     'Failed to verify signature '
579                     'with key %s.' % gpgpubkey['keyid'])
580             return True