9d404cb92b9f1e3cdf84064750fa40b91f595dd3
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 Infrastructure for using OpenPGP keys in Key Manager.
20 """
21 import logging
22 import os
23 import re
24 import shutil
25 import tempfile
26
27 from leap.common.check import leap_assert, leap_assert_type
28 from leap.keymanager import errors
29 from leap.keymanager.keys import (
30     EncryptionKey,
31     EncryptionScheme,
32     is_address,
33     build_key_from_dict,
34     KEYMANAGER_KEY_TAG,
35     TAGS_ADDRESS_PRIVATE_INDEX,
36 )
37 from leap.keymanager.gpg import GPGWrapper
38
39 logger = logging.getLogger(__name__)
40
41
42 #
43 # gpg wrapper and decorator
44 #
45
46 def temporary_gpgwrapper(keys=None):
47     """
48     Returns a unitary gpg wrapper that implements context manager
49     protocol.
50
51     :param key_data: ASCII armored key data.
52     :type key_data: str
53
54     :return: a GPGWrapper instance
55     :rtype: GPGWrapper
56     """
57     # TODO do here checks on key_data
58     return TempGPGWrapper(keys=keys)
59
60
61 def with_temporary_gpg(fun):
62     """
63     Decorator to add a temporary gpg wrapper as context
64     to gpg related functions.
65
66     Decorated functions are expected to return a function whose only
67     argument is a gpgwrapper instance.
68     """
69     def wrapped(*args, **kwargs):
70         """
71         We extract the arguments passed to the wrapped function,
72         run the function and do validations.
73         We expect that the positional arguments are `data`,
74         and an optional `key`.
75         All the rest of arguments should be passed as named arguments
76         to allow for a correct unpacking.
77         """
78         if len(args) == 2:
79             keys = args[1] if isinstance(args[1], OpenPGPKey) else None
80         else:
81             keys = None
82
83         # sign/verify keys passed as arguments
84         sign = kwargs.get('sign', None)
85         if sign:
86             keys = [keys, sign]
87
88         verify = kwargs.get('verify', None)
89         if verify:
90             keys = [keys, verify]
91
92         # is the wrapped function sign or verify?
93         fun_name = fun.__name__
94         is_sign_function = True if fun_name == "sign" else False
95         is_verify_function = True if fun_name == "verify" else False
96
97         result = None
98
99         with temporary_gpgwrapper(keys) as gpg:
100             result = fun(*args, **kwargs)(gpg)
101
102             # TODO: cleanup a little bit the
103             # validation. maybe delegate to other
104             # auxiliary functions for clarity.
105
106             ok = getattr(result, 'ok', None)
107
108             stderr = getattr(result, 'stderr', None)
109             if stderr:
110                 logger.debug("%s" % (stderr,))
111
112             if ok is False:
113                 raise errors.EncryptionDecryptionFailed(
114                     'Failed to encrypt/decrypt in %s: %s' % (
115                         fun.__name__,
116                         stderr))
117
118             if verify is not None:
119                 # A verify key has been passed
120                 if result.valid is False or \
121                         verify.fingerprint != result.pubkey_fingerprint:
122                     raise errors.InvalidSignature(
123                         'Failed to verify signature with key %s: %s' %
124                         (verify.key_id, stderr))
125
126             if is_sign_function:
127                 # Specific validation for sign function
128                 privkey = gpg.list_keys(secret=True).pop()
129                 rfprint = result.fingerprint
130                 kfprint = privkey['fingerprint']
131                 if result.fingerprint is None:
132                     raise errors.SignFailed(
133                         'Failed to sign with key %s: %s' %
134                         (privkey['keyid'], stderr))
135                 leap_assert(
136                     result.fingerprint == kfprint,
137                     'Signature and private key fingerprints mismatch: '
138                     '%s != %s' %
139                     (rfprint, kfprint))
140
141             if is_verify_function:
142                 # Specific validation for verify function
143                 pubkey = gpg.list_keys().pop()
144                 valid = result.valid
145                 rfprint = result.fingerprint
146                 kfprint = pubkey['fingerprint']
147                 if valid is False or rfprint != kfprint:
148                     raise errors.InvalidSignature(
149                         'Failed to verify signature '
150                         'with key %s.' % pubkey['keyid'])
151                 result = result.valid
152
153             # ok, enough checks. let's return data if available
154             if hasattr(result, 'data'):
155                 result = result.data
156         return result
157     return wrapped
158
159
160 class TempGPGWrapper(object):
161     """
162     A context manager returning a temporary GPG wrapper keyring, which
163     contains exactly zero or one pubkeys, and zero or one privkeys.
164
165     Temporary unitary keyrings allow the to use GPG's facilities for exactly
166     one key. This function creates an empty temporary keyring and imports
167     C{keys} if it is not None.
168     """
169     def __init__(self, keys=None):
170         """
171         :param keys: OpenPGP key, or list of.
172         :type keys: OpenPGPKey or list of OpenPGPKeys
173         """
174         self._gpg = None
175         if not keys:
176             keys = list()
177         if not isinstance(keys, list):
178             keys = [keys]
179         self._keys = keys
180         for key in filter(None, keys):
181             leap_assert_type(key, OpenPGPKey)
182
183     def __enter__(self):
184         """
185         Calls the unitary gpgwrapper initializer
186
187         :return: A GPG wrapper with a unitary keyring.
188         :rtype: gnupg.GPG
189         """
190         self._build_keyring()
191         return self._gpg
192
193     def __exit__(self, exc_type, exc_value, traceback):
194         """
195         Ensures the gpgwrapper is properly destroyed.
196         """
197         # TODO handle exceptions and log here
198         self._destroy_keyring()
199
200     def _build_keyring(self):
201         """
202         Create an empty GPG keyring and import C{keys} into it.
203
204         :param keys: List of keys to add to the keyring.
205         :type keys: list of OpenPGPKey
206
207         :return: A GPG wrapper with a unitary keyring.
208         :rtype: gnupg.GPG
209         """
210         privkeys = [key for key in self._keys if key and key.private is True]
211         publkeys = [key for key in self._keys if key and key.private is False]
212         # here we filter out public keys that have a correspondent
213         # private key in the list because the private key_data by
214         # itself is enough to also have the public key in the keyring,
215         # and we want to count the keys afterwards.
216
217         privaddrs = map(lambda privkey: privkey.address, privkeys)
218         publkeys = filter(
219             lambda pubkey: pubkey.address not in privaddrs, publkeys)
220
221         listkeys = lambda: self._gpg.list_keys()
222         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
223
224         self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
225         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
226
227         # import keys into the keyring:
228         # concatenating ascii-armored keys, which is correctly
229         # understood by the GPGWrapper.
230
231         self._gpg.import_keys("".join(
232             [x.key_data for x in publkeys + privkeys]))
233
234         # assert the number of keys in the keyring
235         leap_assert(
236             len(listkeys()) == len(publkeys) + len(privkeys),
237             'Wrong number of public keys in keyring: %d, should be %d)' %
238             (len(listkeys()), len(publkeys) + len(privkeys)))
239         leap_assert(
240             len(listsecretkeys()) == len(privkeys),
241             'Wrong number of private keys in keyring: %d, should be %d)' %
242             (len(listsecretkeys()), len(privkeys)))
243
244     def _destroy_keyring(self):
245         """
246         Securely erase a unitary keyring.
247         """
248         # TODO: implement some kind of wiping of data or a more
249         # secure way that
250         # does not write to disk.
251
252         try:
253             for secret in [True, False]:
254                 for key in self._gpg.list_keys(secret=secret):
255                     self._gpg.delete_keys(
256                         key['fingerprint'],
257                         secret=secret)
258             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
259
260         except:
261             raise
262
263         finally:
264             leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
265                         "watch out! Tried to remove default gnupg home!")
266             shutil.rmtree(self._gpg.gnupghome)
267
268
269 #
270 # API functions
271 #
272
273 @with_temporary_gpg
274 def encrypt_asym(data, key, passphrase=None, sign=None):
275     """
276     Encrypt C{data} using public @{key} and sign with C{sign} key.
277
278     :param data: The data to be encrypted.
279     :type data: str
280     :param pubkey: The key used to encrypt.
281     :type pubkey: OpenPGPKey
282     :param sign: The key used for signing.
283     :type sign: OpenPGPKey
284
285     :return: The encrypted data.
286     :rtype: str
287     """
288     leap_assert_type(key, OpenPGPKey)
289     leap_assert(key.private is False, 'Key is not public.')
290     if sign is not None:
291         leap_assert_type(sign, OpenPGPKey)
292         leap_assert(sign.private is True)
293
294     # Here we cannot assert for correctness of sig because the sig is in
295     # the ciphertext.
296     # result.ok    - (bool) indicates if the operation succeeded
297     # result.data  - (bool) contains the result of the operation
298
299     return lambda gpg: gpg.encrypt(
300         data, key.fingerprint,
301         sign=sign.key_id if sign else None,
302         passphrase=passphrase, symmetric=False)
303
304
305 @with_temporary_gpg
306 def decrypt_asym(data, key, passphrase=None, verify=None):
307     """
308     Decrypt C{data} using private @{key} and verify with C{verify} key.
309
310     :param data: The data to be decrypted.
311     :type data: str
312     :param privkey: The key used to decrypt.
313     :type privkey: OpenPGPKey
314     :param verify: The key used to verify a signature.
315     :type verify: OpenPGPKey
316
317     :return: The decrypted data.
318     :rtype: str
319
320     @raise InvalidSignature: Raised if unable to verify the signature with
321         C{verify} key.
322     """
323     leap_assert(key.private is True, 'Key is not private.')
324     if verify is not None:
325         leap_assert_type(verify, OpenPGPKey)
326         leap_assert(verify.private is False)
327
328     return lambda gpg: gpg.decrypt(
329         data, passphrase=passphrase)
330
331
332 @with_temporary_gpg
333 def is_encrypted(data):
334     """
335     Return whether C{data} was encrypted using OpenPGP.
336
337     :param data: The data we want to know about.
338     :type data: str
339
340     :return: Whether C{data} was encrypted using this wrapper.
341     :rtype: bool
342     """
343     return lambda gpg: gpg.is_encrypted(data)
344
345
346 @with_temporary_gpg
347 def is_encrypted_asym(data):
348     """
349     Return whether C{data} was asymmetrically encrypted using OpenPGP.
350
351     :param data: The data we want to know about.
352     :type data: str
353
354     :return: Whether C{data} was encrypted using this wrapper.
355     :rtype: bool
356     """
357     return lambda gpg: gpg.is_encrypted_asym(data)
358
359
360 @with_temporary_gpg
361 def sign(data, privkey):
362     """
363     Sign C{data} with C{privkey}.
364
365     :param data: The data to be signed.
366     :type data: str
367
368     :param privkey: The private key to be used to sign.
369     :type privkey: OpenPGPKey
370
371     :return: The ascii-armored signed data.
372     :rtype: str
373     """
374     leap_assert_type(privkey, OpenPGPKey)
375     leap_assert(privkey.private is True)
376
377     # result.fingerprint - contains the fingerprint of the key used to
378     #                      sign.
379     return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
380
381
382 @with_temporary_gpg
383 def verify(data, key):
384     """
385     Verify signed C{data} with C{pubkey}.
386
387     :param data: The data to be verified.
388     :type data: str
389
390     :param pubkey: The public key to be used on verification.
391     :type pubkey: OpenPGPKey
392
393     :return: The ascii-armored signed data.
394     :rtype: str
395     """
396     leap_assert_type(key, OpenPGPKey)
397     leap_assert(key.private is False)
398
399     return lambda gpg: gpg.verify(data)
400
401
402 #
403 # Helper functions
404 #
405
406
407 def _build_key_from_gpg(address, key, key_data):
408     """
409     Build an OpenPGPKey for C{address} based on C{key} from
410     local gpg storage.
411
412     ASCII armored GPG key data has to be queried independently in this
413     wrapper, so we receive it in C{key_data}.
414
415     :param address: The address bound to the key.
416     :type address: str
417     :param key: Key obtained from GPG storage.
418     :type key: dict
419     :param key_data: Key data obtained from GPG storage.
420     :type key_data: str
421     :return: An instance of the key.
422     :rtype: OpenPGPKey
423     """
424     return OpenPGPKey(
425         address,
426         key_id=key['keyid'],
427         fingerprint=key['fingerprint'],
428         key_data=key_data,
429         private=True if key['type'] == 'sec' else False,
430         length=key['length'],
431         expiry_date=key['expires'],
432         validation=None,  # TODO: verify for validation.
433     )
434
435
436 #
437 # The OpenPGP wrapper
438 #
439
440 class OpenPGPKey(EncryptionKey):
441     """
442     Base class for OpenPGP keys.
443     """
444
445
446 class OpenPGPScheme(EncryptionScheme):
447     """
448     A wrapper for OpenPGP keys.
449     """
450
451     def __init__(self, soledad):
452         """
453         Initialize the OpenPGP wrapper.
454
455         :param soledad: A Soledad instance for key storage.
456         :type soledad: leap.soledad.Soledad
457         """
458         EncryptionScheme.__init__(self, soledad)
459
460     def gen_key(self, address):
461         """
462         Generate an OpenPGP keypair bound to C{address}.
463
464         :param address: The address bound to the key.
465         :type address: str
466         :return: The key bound to C{address}.
467         :rtype: OpenPGPKey
468         @raise KeyAlreadyExists: If key already exists in local database.
469         """
470         # make sure the key does not already exist
471         leap_assert(is_address(address), 'Not an user address: %s' % address)
472         try:
473             self.get_key(address)
474             raise errors.KeyAlreadyExists(address)
475         except errors.KeyNotFound:
476             logger.debug('Key for %s not found' % (address,))
477
478         def _gen_key(gpg):
479             params = gpg.gen_key_input(
480                 key_type='RSA',
481                 key_length=4096,
482                 name_real=address,
483                 name_email=address,
484                 name_comment='Generated by LEAP Key Manager.')
485             logger.info("About to generate keys... This might take SOME time.")
486             gpg.gen_key(params)
487             logger.info("Keys for %s have been successfully "
488                         "generated." % (address,))
489             pubkeys = gpg.list_keys()
490
491             # assert for new key characteristics
492
493             # XXX This exception is not properly catched by the soledad
494             # bootstrapping, so if we do not finish generating the keys
495             # we end with a blocked thread -- kali
496
497             leap_assert(
498                 len(pubkeys) is 1,  # a unitary keyring!
499                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
500             key = gpg.list_keys(secret=True).pop()
501             leap_assert(
502                 len(key['uids']) is 1,  # with just one uid!
503                 'Wrong number of uids for key: %d.' % len(key['uids']))
504             leap_assert(
505                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
506                 'Key not correctly bound to address.')
507             # insert both public and private keys in storage
508             for secret in [True, False]:
509                 key = gpg.list_keys(secret=secret).pop()
510                 openpgp_key = _build_key_from_gpg(
511                     address, key,
512                     gpg.export_keys(key['fingerprint'], secret=secret))
513                 self.put_key(openpgp_key)
514
515         with temporary_gpgwrapper() as gpg:
516             # TODO: inspect result, or use decorator
517             _gen_key(gpg)
518
519         return self.get_key(address, private=True)
520
521     def get_key(self, address, private=False):
522         """
523         Get key bound to C{address} from local storage.
524
525         :param address: The address bound to the key.
526         :type address: str
527         :param private: Look for a private key instead of a public one?
528         :type private: bool
529
530         :return: The key bound to C{address}.
531         :rtype: OpenPGPKey
532         @raise KeyNotFound: If the key was not found on local storage.
533         """
534         leap_assert(is_address(address), 'Not an user address: %s' % address)
535         doc = self._get_key_doc(address, private)
536         if doc is None:
537             raise errors.KeyNotFound(address)
538         return build_key_from_dict(OpenPGPKey, address, doc.content)
539
540     def put_ascii_key(self, key_data):
541         """
542         Put key contained in ascii-armored C{key_data} in local storage.
543
544         :param key_data: The key data to be stored.
545         :type key_data: str
546         """
547         leap_assert_type(key_data, str)
548         # TODO: add more checks for correct key data.
549         leap_assert(key_data is not None, 'Data does not represent a key.')
550
551         def _put_ascii_key(gpg):
552             gpg.import_keys(key_data)
553             privkey = None
554             pubkey = None
555
556             try:
557                 privkey = gpg.list_keys(secret=True).pop()
558             except IndexError:
559                 pass
560             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
561             # extract adress from first uid on key
562             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
563             leap_assert(match is not None, 'No user address in key data.')
564             address = match.group(1)
565             if privkey is not None:
566                 match = re.match(
567                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
568                 leap_assert(match is not None, 'No user address in key data.')
569                 privaddress = match.group(1)
570                 leap_assert(
571                     address == privaddress,
572                     'Addresses in pub and priv key differ.')
573                 leap_assert(
574                     pubkey['fingerprint'] == privkey['fingerprint'],
575                     'Fingerprints for pub and priv key differ.')
576                 # insert private key in storage
577                 openpgp_privkey = _build_key_from_gpg(
578                     address, privkey,
579                     gpg.export_keys(privkey['fingerprint'], secret=True))
580                 self.put_key(openpgp_privkey)
581             # insert public key in storage
582             openpgp_pubkey = _build_key_from_gpg(
583                 address, pubkey,
584                 gpg.export_keys(pubkey['fingerprint'], secret=False))
585             self.put_key(openpgp_pubkey)
586
587         with temporary_gpgwrapper() as gpg:
588             # TODO: inspect result, or use decorator
589             _put_ascii_key(gpg)
590
591     def put_key(self, key):
592         """
593         Put C{key} in local storage.
594
595         :param key: The key to be stored.
596         :type key: OpenPGPKey
597         """
598         doc = self._get_key_doc(key.address, private=key.private)
599         if doc is None:
600             self._soledad.create_doc_from_json(key.get_json())
601         else:
602             doc.set_json(key.get_json())
603             self._soledad.put_doc(doc)
604
605     def _get_key_doc(self, address, private=False):
606         """
607         Get the document with a key (public, by default) bound to C{address}.
608
609         If C{private} is True, looks for a private key instead of a public.
610
611         :param address: The address bound to the key.
612         :type address: str
613         :param private: Whether to look for a private key.
614         :type private: bool
615         :return: The document with the key or None if it does not exist.
616         :rtype: leap.soledad.document.SoledadDocument
617         """
618         doclist = self._soledad.get_from_index(
619             TAGS_ADDRESS_PRIVATE_INDEX,
620             KEYMANAGER_KEY_TAG,
621             address,
622             '1' if private else '0')
623         if len(doclist) is 0:
624             return None
625         leap_assert(
626             len(doclist) is 1,
627             'Found more than one %s key for address!' %
628             'private' if private else 'public')
629         return doclist.pop()
630
631     def delete_key(self, key):
632         """
633         Remove C{key} from storage.
634
635         :param key: The key to be removed.
636         :type key: EncryptionKey
637         """
638         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
639         stored_key = self.get_key(key.address, private=key.private)
640         if stored_key is None:
641             raise errors.KeyNotFound(key)
642         if stored_key.__dict__ != key.__dict__:
643             raise errors.KeyAttributesDiffer(key)
644         doc = self._get_key_doc(key.address, key.private)
645         self._soledad.delete_doc(doc)