use temporary openpgpwrapper as a context manager
[leap_pycommon.git] / src / leap / common / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 Infrastructure for using OpenPGP keys in Key Manager.
20 """
21 import logging
22 import os
23 import re
24 import shutil
25 import tempfile
26
27 from leap.common.check import leap_assert, leap_assert_type
28 from leap.common.keymanager import errors
29
30 from leap.common.keymanager.keys import (
31     EncryptionKey,
32     EncryptionScheme,
33     is_address,
34     keymanager_doc_id,
35     build_key_from_dict,
36 )
37 from leap.common.keymanager.gpg import GPGWrapper
38
39 logger = logging.getLogger(__name__)
40
41
42 #
43 # gpg wrapper and decorator
44 #
45
46 def temporary_gpgwrapper(keys=None):
47     """
48     Returns a unitary gpg wrapper that implements context manager
49     protocol.
50
51     @param key_data: ASCII armored key data.
52     @type key_data: str
53
54     @return: a GPGWrapper instance
55     @rtype: GPGWrapper
56     """
57     # TODO do here checks on key_data
58     return TempGPGWrapper(keys=keys)
59
60
61 def with_temporary_gpg(fun):
62     """
63     Decorator to add a temporary gpg wrapper as context
64     to gpg related functions.
65
66     Decorated functions are expected to return a function whose only
67     argument is a gpgwrapper instance.
68     """
69     def wrapped(*args, **kwargs):
70         """
71         We extract the arguments passed to the wrapped function,
72         run the function and do validations.
73         We expect that the positional arguments are `data`,
74         and an optional `key`.
75         All the rest of arguments should be passed as named arguments
76         to allow for a correct unpacking.
77         """
78         if len(args) == 2:
79             keys = args[1] if isinstance(args[1], OpenPGPKey) else None
80         else:
81             keys = None
82
83         # sign/verify keys passed as arguments
84         sign = kwargs.get('sign', None)
85         if sign:
86             keys = [keys, sign]
87
88         verify = kwargs.get('verify', None)
89         if verify:
90             keys = [keys, verify]
91
92         # is the wrapped function sign or verify?
93         fun_name = fun.__name__
94         is_sign_function = True if fun_name == "sign" else False
95         is_verify_function = True if fun_name == "verify" else False
96
97         result = None
98
99         with temporary_gpgwrapper(keys) as gpg:
100             result = fun(*args, **kwargs)(gpg)
101
102             # TODO: cleanup a little bit the
103             # validation. maybe delegate to other
104             # auxiliary functions for clarity.
105
106             ok = getattr(result, 'ok', None)
107
108             stderr = getattr(result, 'stderr', None)
109             if stderr:
110                 logger.debug("%s" % (stderr,))
111
112             if ok is False:
113                 raise errors.EncryptionDecryptionFailed(
114                     'Failed to encrypt/decrypt in %s: %s' % (
115                         fun.__name__,
116                         stderr))
117
118             if verify is not None:
119                 # A verify key has been passed
120                 if result.valid is False or \
121                         verify.fingerprint != result.pubkey_fingerprint:
122                     raise errors.InvalidSignature(
123                         'Failed to verify signature with key %s: %s' %
124                         (verify.key_id, stderr))
125
126             if is_sign_function:
127                 # Specific validation for sign function
128                 privkey = gpg.list_keys(secret=True).pop()
129                 rfprint = result.fingerprint
130                 kfprint = privkey['fingerprint']
131                 if result.fingerprint is None:
132                     raise errors.SignFailed(
133                         'Failed to sign with key %s: %s' %
134                         (privkey['keyid'], stderr))
135                 leap_assert(
136                     result.fingerprint == kfprint,
137                     'Signature and private key fingerprints mismatch: '
138                     '%s != %s' %
139                     (rfprint, kfprint))
140
141             if is_verify_function:
142                 # Specific validation for verify function
143                 pubkey = gpg.list_keys().pop()
144                 valid = result.valid
145                 rfprint = result.fingerprint
146                 kfprint = pubkey['fingerprint']
147                 if valid is False or rfprint != kfprint:
148                     raise errors.InvalidSignature(
149                         'Failed to verify signature '
150                         'with key %s.' % pubkey['keyid'])
151                 result = result.valid
152
153             # ok, enough checks. let's return data if available
154             if hasattr(result, 'data'):
155                 result = result.data
156         return result
157     return wrapped
158
159
160 class TempGPGWrapper(object):
161     """
162     A context manager returning a temporary GPG wrapper keyring, which
163     contains exactly zero or one pubkeys, and zero or one privkeys.
164
165     Temporary unitary keyrings allow the to use GPG's facilities for exactly
166     one key. This function creates an empty temporary keyring and imports
167     C{keys} if it is not None.
168     """
169     def __init__(self, keys=None):
170         """
171         @param keys: OpenPGP key, or list of.
172         @type keys: OpenPGPKey or list of OpenPGPKeys
173         """
174         self._gpg = None
175         if not keys:
176             keys = list()
177         if not isinstance(keys, list):
178             keys = [keys]
179         self._keys = keys
180         for key in filter(None, keys):
181             leap_assert_type(key, OpenPGPKey)
182
183     def __enter__(self):
184         """
185         Calls the unitary gpgwrapper initializer
186
187         @return: A GPG wrapper with a unitary keyring.
188         @rtype: gnupg.GPG
189         """
190         self._build_keyring()
191         return self._gpg
192
193     def __exit__(self, exc_type, exc_value, traceback):
194         """
195         Ensures the gpgwrapper is properly destroyed.
196         """
197         # TODO handle exceptions and log here
198         self._destroy_keyring()
199
200     def _build_keyring(self):
201         """
202         Create an empty GPG keyring and import C{keys} into it.
203
204         @param keys: List of keys to add to the keyring.
205         @type keys: list of OpenPGPKey
206
207         @return: A GPG wrapper with a unitary keyring.
208         @rtype: gnupg.GPG
209         """
210         privkeys = [key for key in self._keys if key and key.private is True]
211         publkeys = [key for key in self._keys if key and key.private is False]
212         # here we filter out public keys that have a correspondent
213         # private key in the list because the private key_data by
214         # itself is enough to also have the public key in the keyring,
215         # and we want to count the keys afterwards.
216
217         privaddrs = map(lambda privkey: privkey.address, privkeys)
218         publkeys = filter(
219             lambda pubkey: pubkey.address not in privaddrs, publkeys)
220
221         listkeys = lambda: self._gpg.list_keys()
222         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
223
224         self._gpg = GPGWrapper(gnupghome=tempfile.mkdtemp())
225         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
226
227         # import keys into the keyring:
228         # concatenating ascii-armored keys, which is correctly
229         # understood by the GPGWrapper.
230
231         self._gpg.import_keys("".join(
232             [x.key_data for x in publkeys + privkeys]))
233
234         # assert the number of keys in the keyring
235         leap_assert(
236             len(listkeys()) == len(publkeys) + len(privkeys),
237             'Wrong number of public keys in keyring: %d, should be %d)' %
238             (len(listkeys()), len(publkeys) + len(privkeys)))
239         leap_assert(
240             len(listsecretkeys()) == len(privkeys),
241             'Wrong number of private keys in keyring: %d, should be %d)' %
242             (len(listsecretkeys()), len(privkeys)))
243
244     def _destroy_keyring(self):
245         """
246         Securely erase a unitary keyring.
247         """
248         # TODO: implement some kind of wiping of data or a more
249         # secure way that
250         # does not write to disk.
251
252         try:
253             for secret in [True, False]:
254                 for key in self._gpg.list_keys(secret=secret):
255                     self._gpg.delete_keys(
256                         key['fingerprint'],
257                         secret=secret)
258             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
259
260         except:
261             raise
262
263         finally:
264             leap_assert(self._gpg.gnupghome != os.path.expanduser('~/.gnupg'),
265                         "watch out! Tried to remove default gnupg home!")
266             shutil.rmtree(self._gpg.gnupghome)
267
268
269 #
270 # API functions
271 #
272
273 @with_temporary_gpg
274 def encrypt_sym(data, passphrase=None, sign=None):
275     """
276     Encrypt C{data} with C{passphrase} and sign with C{sign} private key.
277
278     @param data: The data to be encrypted.
279     @type data: str
280     @param passphrase: The passphrase used to encrypt C{data}.
281     @type passphrase: str
282     @param sign: The private key used for signing.
283     @type sign: OpenPGPKey
284
285     @return: The encrypted data.
286     @rtype: str
287     """
288     leap_assert_type(passphrase, str)
289     if sign is not None:
290         leap_assert_type(sign, OpenPGPKey)
291         leap_assert(sign.private is True)
292
293     # Here we cannot assert for correctness of sig because the sig is in
294     # the ciphertext.
295     # result.ok    - (bool) indicates if the operation succeeded
296     # result.data  - (bool) contains the result of the operation
297
298     return lambda gpg: gpg.encrypt(
299         data, None,
300         sign=sign.key_id if sign else None,
301         passphrase=passphrase, symmetric=True)
302
303
304 @with_temporary_gpg
305 def decrypt_sym(data, passphrase=None, verify=None):
306     """
307     Decrypt C{data} with C{passphrase} and verify with C{verify} public
308     key.
309
310     @param data: The data to be decrypted.
311     @type data: str
312     @param passphrase: The passphrase used to decrypt C{data}.
313     @type passphrase: str
314     @param verify: The key used to verify a signature.
315     @type verify: OpenPGPKey
316
317     @return: The decrypted data.
318     @rtype: str
319
320     @raise InvalidSignature: Raised if unable to verify the signature with
321         C{verify} key.
322     """
323     leap_assert_type(passphrase, str)
324     if verify is not None:
325         leap_assert_type(verify, OpenPGPKey)
326         leap_assert(verify.private is False)
327
328     # result.ok    - (bool) indicates if the operation succeeded
329     # result.valid - (bool) indicates if the signature was verified
330     # result.data  - (bool) contains the result of the operation
331     # result.pubkey_fingerpring  - (str) contains the fingerprint of the
332     #                              public key that signed this data.
333     return lambda gpg: gpg.decrypt(
334         data, passphrase=passphrase)
335
336
337 @with_temporary_gpg
338 def encrypt_asym(data, key, passphrase=None, sign=None):
339     """
340     Encrypt C{data} using public @{key} and sign with C{sign} key.
341
342     @param data: The data to be encrypted.
343     @type data: str
344     @param pubkey: The key used to encrypt.
345     @type pubkey: OpenPGPKey
346     @param sign: The key used for signing.
347     @type sign: OpenPGPKey
348
349     @return: The encrypted data.
350     @rtype: str
351     """
352     leap_assert_type(key, OpenPGPKey)
353     leap_assert(key.private is False, 'Key is not public.')
354     if sign is not None:
355         leap_assert_type(sign, OpenPGPKey)
356         leap_assert(sign.private is True)
357
358     # Here we cannot assert for correctness of sig because the sig is in
359     # the ciphertext.
360     # result.ok    - (bool) indicates if the operation succeeded
361     # result.data  - (bool) contains the result of the operation
362
363     return lambda gpg: gpg.encrypt(
364         data, key.fingerprint,
365         sign=sign.key_id if sign else None,
366         passphrase=passphrase, symmetric=False)
367
368
369 @with_temporary_gpg
370 def decrypt_asym(data, key, passphrase=None, verify=None):
371     """
372     Decrypt C{data} using private @{key} and verify with C{verify} key.
373
374     @param data: The data to be decrypted.
375     @type data: str
376     @param privkey: The key used to decrypt.
377     @type privkey: OpenPGPKey
378     @param verify: The key used to verify a signature.
379     @type verify: OpenPGPKey
380
381     @return: The decrypted data.
382     @rtype: str
383
384     @raise InvalidSignature: Raised if unable to verify the signature with
385         C{verify} key.
386     """
387     leap_assert(key.private is True, 'Key is not private.')
388     if verify is not None:
389         leap_assert_type(verify, OpenPGPKey)
390         leap_assert(verify.private is False)
391
392     return lambda gpg: gpg.decrypt(
393         data, passphrase=passphrase)
394
395
396 @with_temporary_gpg
397 def is_encrypted(data):
398     """
399     Return whether C{data} was encrypted using OpenPGP.
400
401     @param data: The data we want to know about.
402     @type data: str
403
404     @return: Whether C{data} was encrypted using this wrapper.
405     @rtype: bool
406     """
407     return lambda gpg: gpg.is_encrypted(data)
408
409
410 @with_temporary_gpg
411 def is_encrypted_sym(data):
412     """
413     Return whether C{data} was encrypted using a public OpenPGP key.
414
415     @param data: The data we want to know about.
416     @type data: str
417
418     @return: Whether C{data} was encrypted using this wrapper.
419     @rtype: bool
420     """
421     return lambda gpg: gpg.is_encrypted_sym(data)
422
423
424 @with_temporary_gpg
425 def is_encrypted_asym(data):
426     """
427     Return whether C{data} was asymmetrically encrypted using OpenPGP.
428
429     @param data: The data we want to know about.
430     @type data: str
431
432     @return: Whether C{data} was encrypted using this wrapper.
433     @rtype: bool
434     """
435     return lambda gpg: gpg.is_encrypted_asym(data)
436
437
438 @with_temporary_gpg
439 def sign(data, privkey):
440     """
441     Sign C{data} with C{privkey}.
442
443     @param data: The data to be signed.
444     @type data: str
445
446     @param privkey: The private key to be used to sign.
447     @type privkey: OpenPGPKey
448
449     @return: The ascii-armored signed data.
450     @rtype: str
451     """
452     leap_assert_type(privkey, OpenPGPKey)
453     leap_assert(privkey.private is True)
454
455     # result.fingerprint - contains the fingerprint of the key used to
456     #                      sign.
457     return lambda gpg: gpg.sign(data, keyid=privkey.key_id)
458
459
460 @with_temporary_gpg
461 def verify(data, key):
462     """
463     Verify signed C{data} with C{pubkey}.
464
465     @param data: The data to be verified.
466     @type data: str
467
468     @param pubkey: The public key to be used on verification.
469     @type pubkey: OpenPGPKey
470
471     @return: The ascii-armored signed data.
472     @rtype: str
473     """
474     leap_assert_type(key, OpenPGPKey)
475     leap_assert(key.private is False)
476
477     return lambda gpg: gpg.verify(data)
478
479
480 #
481 # Helper functions
482 #
483
484
485 def _build_key_from_gpg(address, key, key_data):
486     """
487     Build an OpenPGPKey for C{address} based on C{key} from
488     local gpg storage.
489
490     ASCII armored GPG key data has to be queried independently in this
491     wrapper, so we receive it in C{key_data}.
492
493     @param address: The address bound to the key.
494     @type address: str
495     @param key: Key obtained from GPG storage.
496     @type key: dict
497     @param key_data: Key data obtained from GPG storage.
498     @type key_data: str
499     @return: An instance of the key.
500     @rtype: OpenPGPKey
501     """
502     return OpenPGPKey(
503         address,
504         key_id=key['keyid'],
505         fingerprint=key['fingerprint'],
506         key_data=key_data,
507         private=True if key['type'] == 'sec' else False,
508         length=key['length'],
509         expiry_date=key['expires'],
510         validation=None,  # TODO: verify for validation.
511     )
512
513
514 #
515 # The OpenPGP wrapper
516 #
517
518 class OpenPGPKey(EncryptionKey):
519     """
520     Base class for OpenPGP keys.
521     """
522
523
524 class OpenPGPScheme(EncryptionScheme):
525     """
526     A wrapper for OpenPGP keys.
527     """
528
529     def __init__(self, soledad):
530         """
531         Initialize the OpenPGP wrapper.
532
533         @param soledad: A Soledad instance for key storage.
534         @type soledad: leap.soledad.Soledad
535         """
536         EncryptionScheme.__init__(self, soledad)
537
538     def gen_key(self, address):
539         """
540         Generate an OpenPGP keypair bound to C{address}.
541
542         @param address: The address bound to the key.
543         @type address: str
544         @return: The key bound to C{address}.
545         @rtype: OpenPGPKey
546         @raise KeyAlreadyExists: If key already exists in local database.
547         """
548         # make sure the key does not already exist
549         leap_assert(is_address(address), 'Not an user address: %s' % address)
550         try:
551             self.get_key(address)
552             raise errors.KeyAlreadyExists(address)
553         except errors.KeyNotFound:
554             pass
555
556         def _gen_key(gpg):
557             params = gpg.gen_key_input(
558                 key_type='RSA',
559                 key_length=4096,
560                 name_real=address,
561                 name_email=address,
562                 name_comment='Generated by LEAP Key Manager.')
563             gpg.gen_key(params)
564             pubkeys = gpg.list_keys()
565             # assert for new key characteristics
566             leap_assert(
567                 len(pubkeys) is 1,  # a unitary keyring!
568                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
569             key = gpg.list_keys(secret=True).pop()
570             leap_assert(
571                 len(key['uids']) is 1,  # with just one uid!
572                 'Wrong number of uids for key: %d.' % len(key['uids']))
573             leap_assert(
574                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
575                 'Key not correctly bound to address.')
576             # insert both public and private keys in storage
577             for secret in [True, False]:
578                 key = gpg.list_keys(secret=secret).pop()
579                 openpgp_key = _build_key_from_gpg(
580                     address, key,
581                     gpg.export_keys(key['fingerprint'], secret=secret))
582                 self.put_key(openpgp_key)
583
584         with temporary_gpgwrapper() as gpg:
585             # TODO: inspect result, or use decorator
586             _gen_key(gpg)
587
588         return self.get_key(address, private=True)
589
590     def get_key(self, address, private=False):
591         """
592         Get key bound to C{address} from local storage.
593
594         @param address: The address bound to the key.
595         @type address: str
596         @param private: Look for a private key instead of a public one?
597         @type private: bool
598
599         @return: The key bound to C{address}.
600         @rtype: OpenPGPKey
601         @raise KeyNotFound: If the key was not found on local storage.
602         """
603         leap_assert(is_address(address), 'Not an user address: %s' % address)
604         doc = self._get_key_doc(address, private)
605         if doc is None:
606             raise errors.KeyNotFound(address)
607         return build_key_from_dict(OpenPGPKey, address, doc.content)
608
609     def put_ascii_key(self, key_data):
610         """
611         Put key contained in ascii-armored C{key_data} in local storage.
612
613         @param key_data: The key data to be stored.
614         @type key_data: str
615         """
616         leap_assert_type(key_data, str)
617         # TODO: add more checks for correct key data.
618         leap_assert(key_data is not None, 'Data does not represent a key.')
619
620         def _put_ascii_key(gpg):
621             gpg.import_keys(key_data)
622             privkey = None
623             pubkey = None
624
625             try:
626                 privkey = gpg.list_keys(secret=True).pop()
627             except IndexError:
628                 pass
629             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
630             # extract adress from first uid on key
631             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
632             leap_assert(match is not None, 'No user address in key data.')
633             address = match.group(1)
634             if privkey is not None:
635                 match = re.match(
636                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
637                 leap_assert(match is not None, 'No user address in key data.')
638                 privaddress = match.group(1)
639                 leap_assert(
640                     address == privaddress,
641                     'Addresses in pub and priv key differ.')
642                 leap_assert(
643                     pubkey['fingerprint'] == privkey['fingerprint'],
644                     'Fingerprints for pub and priv key differ.')
645                 # insert private key in storage
646                 openpgp_privkey = _build_key_from_gpg(
647                     address, privkey,
648                     gpg.export_keys(privkey['fingerprint'], secret=True))
649                 self.put_key(openpgp_privkey)
650             # insert public key in storage
651             openpgp_pubkey = _build_key_from_gpg(
652                 address, pubkey,
653                 gpg.export_keys(pubkey['fingerprint'], secret=False))
654             self.put_key(openpgp_pubkey)
655
656         with temporary_gpgwrapper() as gpg:
657             # TODO: inspect result, or use decorator
658             _put_ascii_key(gpg)
659
660     def put_key(self, key):
661         """
662         Put C{key} in local storage.
663
664         @param key: The key to be stored.
665         @type key: OpenPGPKey
666         """
667         doc = self._get_key_doc(key.address, private=key.private)
668         if doc is None:
669             self._soledad.create_doc_from_json(
670                 key.get_json(),
671                 doc_id=keymanager_doc_id(
672                     OpenPGPKey, key.address, key.private))
673         else:
674             doc.set_json(key.get_json())
675             self._soledad.put_doc(doc)
676
677     def _get_key_doc(self, address, private=False):
678         """
679         Get the document with a key (public, by default) bound to C{address}.
680
681         If C{private} is True, looks for a private key instead of a public.
682
683         @param address: The address bound to the key.
684         @type address: str
685         @param private: Whether to look for a private key.
686         @type private: bool
687         @return: The document with the key or None if it does not exist.
688         @rtype: leap.soledad.backends.leap_backend.LeapDocument
689         """
690         return self._soledad.get_doc(
691             keymanager_doc_id(OpenPGPKey, address, private))
692
693     def delete_key(self, key):
694         """
695         Remove C{key} from storage.
696
697         @param key: The key to be removed.
698         @type key: EncryptionKey
699         """
700         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
701         stored_key = self.get_key(key.address, private=key.private)
702         if stored_key is None:
703             raise errors.KeyNotFound(key)
704         if stored_key.__dict__ != key.__dict__:
705             raise errors.KeyAttributesDiffer(key)
706         doc = self._soledad.get_doc(
707             keymanager_doc_id(OpenPGPKey, key.address, key.private))
708         self._soledad.delete_doc(doc)