0adfc52d628926c8b3e8c46aab346ec92c3bcad3
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from datetime import datetime
29 from gnupg import GPG
30 from gnupg.gnupg import GPGUtilities
31 from twisted.internet import defer
32
33 from leap.common.check import leap_assert, leap_assert_type, leap_check
34 from leap.keymanager import errors
35 from leap.keymanager.keys import (
36     EncryptionKey,
37     EncryptionScheme,
38     is_address,
39     build_key_from_dict,
40     TYPE_ID_PRIVATE_INDEX,
41     TYPE_ADDRESS_PRIVATE_INDEX,
42     KEY_ADDRESS_KEY,
43     KEY_ID_KEY,
44     KEYMANAGER_ACTIVE_TYPE,
45 )
46
47
48 logger = logging.getLogger(__name__)
49
50
51 #
52 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
53 #
54
55 class TempGPGWrapper(object):
56     """
57     A context manager that wraps a temporary GPG keyring which only contains
58     the keys given at object creation.
59     """
60
61     def __init__(self, keys=None, gpgbinary=None):
62         """
63         Create an empty temporary keyring and import any given C{keys} into
64         it.
65
66         :param keys: OpenPGP key, or list of.
67         :type keys: OpenPGPKey or list of OpenPGPKeys
68         :param gpgbinary: Name for GnuPG binary executable.
69         :type gpgbinary: C{str}
70         """
71         self._gpg = None
72         self._gpgbinary = gpgbinary
73         if not keys:
74             keys = list()
75         if not isinstance(keys, list):
76             keys = [keys]
77         self._keys = keys
78         for key in keys:
79             leap_assert_type(key, OpenPGPKey)
80
81     def __enter__(self):
82         """
83         Build and return a GPG keyring containing the keys given on
84         object creation.
85
86         :return: A GPG instance containing the keys given on object creation.
87         :rtype: gnupg.GPG
88         """
89         self._build_keyring()
90         return self._gpg
91
92     def __exit__(self, exc_type, exc_value, traceback):
93         """
94         Ensure the gpg is properly destroyed.
95         """
96         # TODO handle exceptions and log here
97         self._destroy_keyring()
98
99     def _build_keyring(self):
100         """
101         Create a GPG keyring containing the keys given on object creation.
102
103         :return: A GPG instance containing the keys given on object creation.
104         :rtype: gnupg.GPG
105         """
106         privkeys = [key for key in self._keys if key and key.private is True]
107         publkeys = [key for key in self._keys if key and key.private is False]
108         # here we filter out public keys that have a correspondent
109         # private key in the list because the private key_data by
110         # itself is enough to also have the public key in the keyring,
111         # and we want to count the keys afterwards.
112
113         privids = map(lambda privkey: privkey.key_id, privkeys)
114         publkeys = filter(
115             lambda pubkey: pubkey.key_id not in privids, publkeys)
116
117         listkeys = lambda: self._gpg.list_keys()
118         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
119
120         self._gpg = GPG(binary=self._gpgbinary,
121                         homedir=tempfile.mkdtemp())
122         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
123
124         # import keys into the keyring:
125         # concatenating ascii-armored keys, which is correctly
126         # understood by GPG.
127
128         self._gpg.import_keys("".join(
129             [x.key_data for x in publkeys + privkeys]))
130
131         # assert the number of keys in the keyring
132         leap_assert(
133             len(listkeys()) == len(publkeys) + len(privkeys),
134             'Wrong number of public keys in keyring: %d, should be %d)' %
135             (len(listkeys()), len(publkeys) + len(privkeys)))
136         leap_assert(
137             len(listsecretkeys()) == len(privkeys),
138             'Wrong number of private keys in keyring: %d, should be %d)' %
139             (len(listsecretkeys()), len(privkeys)))
140
141     def _destroy_keyring(self):
142         """
143         Securely erase the keyring.
144         """
145         # TODO: implement some kind of wiping of data or a more
146         # secure way that
147         # does not write to disk.
148
149         try:
150             for secret in [True, False]:
151                 for key in self._gpg.list_keys(secret=secret):
152                     self._gpg.delete_keys(
153                         key['fingerprint'],
154                         secret=secret)
155             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
156
157         except:
158             raise
159
160         finally:
161             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
162                         "watch out! Tried to remove default gnupg home!")
163             shutil.rmtree(self._gpg.homedir)
164
165
166 def _build_key_from_gpg(key, key_data):
167     """
168     Build an OpenPGPKey based on C{key} from local gpg storage.
169
170     ASCII armored GPG key data has to be queried independently in this
171     wrapper, so we receive it in C{key_data}.
172
173     :param key: Key obtained from GPG storage.
174     :type key: dict
175     :param key_data: Key data obtained from GPG storage.
176     :type key_data: str
177     :return: An instance of the key.
178     :rtype: OpenPGPKey
179     """
180     expiry_date = None
181     if key['expires']:
182         expiry_date = datetime.fromtimestamp(int(key['expires']))
183     address = []
184     for uid in key['uids']:
185         address.append(_parse_address(uid))
186
187     return OpenPGPKey(
188         address,
189         key_id=key['keyid'],
190         fingerprint=key['fingerprint'],
191         key_data=key_data,
192         private=True if key['type'] == 'sec' else False,
193         length=int(key['length']),
194         expiry_date=expiry_date,
195         refreshed_at=datetime.now(),
196     )
197
198
199 def _parse_address(address):
200     """
201     Remove name, '<', '>' and the identity suffix after the '+' until the '@'
202     e.g.: test_user+something@provider.com becomes test_user@provider.com
203     since the key belongs to the identity without the '+' suffix.
204
205     :type address: str
206     :rtype: str
207     """
208     mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
209     match = re.match(mail_regex, address)
210     if match is None:
211         return None
212     return ''.join(match.group(2, 4))
213
214
215 #
216 # The OpenPGP wrapper
217 #
218
219 class OpenPGPKey(EncryptionKey):
220     """
221     Base class for OpenPGP keys.
222     """
223
224
225 class OpenPGPScheme(EncryptionScheme):
226     """
227     A wrapper for OpenPGP keys management and use (encryption, decyption,
228     signing and verification).
229     """
230
231     # type used on the soledad documents
232     KEY_TYPE = OpenPGPKey.__name__
233     ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
234
235     def __init__(self, soledad, gpgbinary=None):
236         """
237         Initialize the OpenPGP wrapper.
238
239         :param soledad: A Soledad instance for key storage.
240         :type soledad: leap.soledad.Soledad
241         :param gpgbinary: Name for GnuPG binary executable.
242         :type gpgbinary: C{str}
243         """
244         EncryptionScheme.__init__(self, soledad)
245         self._wait_indexes("get_key", "put_key")
246         self._gpgbinary = gpgbinary
247
248     #
249     # Keys management
250     #
251
252     def gen_key(self, address):
253         """
254         Generate an OpenPGP keypair bound to C{address}.
255
256         :param address: The address bound to the key.
257         :type address: str
258
259         :return: A Deferred which fires with the key bound to address, or fails
260                  with KeyAlreadyExists if key already exists in local database.
261         :rtype: Deferred
262         """
263         # make sure the key does not already exist
264         leap_assert(is_address(address), 'Not an user address: %s' % address)
265
266         def _gen_key(_):
267             with self._temporary_gpgwrapper() as gpg:
268                 # TODO: inspect result, or use decorator
269                 params = gpg.gen_key_input(
270                     key_type='RSA',
271                     key_length=4096,
272                     name_real=address,
273                     name_email=address,
274                     name_comment='')
275                 logger.info("About to generate keys... "
276                             "This might take SOME time.")
277                 gpg.gen_key(params)
278                 logger.info("Keys for %s have been successfully "
279                             "generated." % (address,))
280                 pubkeys = gpg.list_keys()
281
282                 # assert for new key characteristics
283                 leap_assert(
284                     len(pubkeys) is 1,  # a unitary keyring!
285                     'Keyring has wrong number of keys: %d.' % len(pubkeys))
286                 key = gpg.list_keys(secret=True).pop()
287                 leap_assert(
288                     len(key['uids']) is 1,  # with just one uid!
289                     'Wrong number of uids for key: %d.' % len(key['uids']))
290                 uid_match = False
291                 for uid in key['uids']:
292                     if re.match('.*<%s>$' % address, uid) is not None:
293                         uid_match = True
294                         break
295                 leap_assert(uid_match, 'Key not correctly bound to address.')
296
297                 # insert both public and private keys in storage
298                 deferreds = []
299                 for secret in [True, False]:
300                     key = gpg.list_keys(secret=secret).pop()
301                     openpgp_key = _build_key_from_gpg(
302                         key,
303                         gpg.export_keys(key['fingerprint'], secret=secret))
304                     d = self.put_key(openpgp_key, address)
305                     deferreds.append(d)
306                 return defer.gatherResults(deferreds)
307
308         def key_already_exists(_):
309             raise errors.KeyAlreadyExists(address)
310
311         d = self.get_key(address)
312         d.addCallbacks(key_already_exists, _gen_key)
313         d.addCallback(lambda _: self.get_key(address, private=True))
314         return d
315
316     def get_key(self, address, private=False):
317         """
318         Get key bound to C{address} from local storage.
319
320         :param address: The address bound to the key.
321         :type address: str
322         :param private: Look for a private key instead of a public one?
323         :type private: bool
324
325         :return: A Deferred which fires with the OpenPGPKey bound to address,
326                  or which fails with KeyNotFound if the key was not found on
327                  local storage.
328         :rtype: Deferred
329         """
330         address = _parse_address(address)
331
332         def build_key(doc):
333             if doc is None:
334                 raise errors.KeyNotFound(address)
335             leap_assert(
336                 address in doc.content[KEY_ADDRESS_KEY],
337                 'Wrong address in key data.')
338             return build_key_from_dict(OpenPGPKey, doc.content)
339
340         d = self._get_key_doc(address, private)
341         d.addCallback(build_key)
342         return d
343
344     def parse_ascii_key(self, key_data):
345         """
346         Parses an ascii armored key (or key pair) data and returns
347         the OpenPGPKey keys.
348
349         :param key_data: the key data to be parsed.
350         :type key_data: str or unicode
351
352         :returns: the public key and private key (if applies) for that data.
353         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
354                 the tuple may have one or both components None
355         """
356         leap_assert_type(key_data, (str, unicode))
357         # TODO: add more checks for correct key data.
358         leap_assert(key_data is not None, 'Data does not represent a key.')
359
360         with self._temporary_gpgwrapper() as gpg:
361             # TODO: inspect result, or use decorator
362             gpg.import_keys(key_data)
363             privkey = None
364             pubkey = None
365
366             try:
367                 privkey = gpg.list_keys(secret=True).pop()
368             except IndexError:
369                 pass
370             try:
371                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
372             except IndexError:
373                 return (None, None)
374
375             openpgp_privkey = None
376             if privkey is not None:
377                 # build private key
378                 openpgp_privkey = _build_key_from_gpg(
379                     privkey,
380                     gpg.export_keys(privkey['fingerprint'], secret=True))
381                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
382                            'Fingerprints for public and private key differ.',
383                            errors.KeyFingerprintMismatch)
384
385             # build public key
386             openpgp_pubkey = _build_key_from_gpg(
387                 pubkey,
388                 gpg.export_keys(pubkey['fingerprint'], secret=False))
389
390             return (openpgp_pubkey, openpgp_privkey)
391
392     def put_ascii_key(self, key_data, address):
393         """
394         Put key contained in ascii-armored C{key_data} in local storage.
395
396         :param key_data: The key data to be stored.
397         :type key_data: str or unicode
398         :param address: address for which this key will be active
399         :type address: str
400
401         :return: A Deferred which fires when the OpenPGPKey is in the storage.
402         :rtype: Deferred
403         """
404         leap_assert_type(key_data, (str, unicode))
405
406         openpgp_privkey = None
407         try:
408             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
409         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
410             return defer.fail(e)
411
412         def put_key(_, key):
413             return self.put_key(key, address)
414
415         d = defer.succeed(None)
416         if openpgp_pubkey is not None:
417             d.addCallback(put_key, openpgp_pubkey)
418         if openpgp_privkey is not None:
419             d.addCallback(put_key, openpgp_privkey)
420         return d
421
422     def put_key(self, key, address):
423         """
424         Put C{key} in local storage.
425
426         :param key: The key to be stored.
427         :type key: OpenPGPKey
428         :param address: address for which this key will be active.
429         :type address: str
430
431         :return: A Deferred which fires when the key is in the storage.
432         :rtype: Deferred
433         """
434         d = self._put_key_doc(key)
435         d.addCallback(lambda _: self._put_active_doc(key, address))
436         return d
437
438     def _put_key_doc(self, key):
439         """
440         Put key document in soledad
441
442         :type key: OpenPGPKey
443         :rtype: Deferred
444         """
445         def check_and_put(docs, key):
446             if len(docs) == 1:
447                 doc = docs.pop()
448                 oldkey = build_key_from_dict(OpenPGPKey, doc.content)
449                 if key.fingerprint == oldkey.fingerprint:
450                     # in case of an update of the key merge them with gnupg
451                     with self._temporary_gpgwrapper() as gpg:
452                         gpg.import_keys(oldkey.key_data)
453                         gpg.import_keys(key.key_data)
454                         gpgkey = gpg.list_keys(secret=key.private).pop()
455                         mergedkey = _build_key_from_gpg(
456                             gpgkey,
457                             gpg.export_keys(gpgkey['fingerprint'],
458                                             secret=key.private))
459                     mergedkey.validation = max(
460                         [key.validation, oldkey.validation])
461                     mergedkey.last_audited_at = oldkey.last_audited_at
462                     mergedkey.refreshed_at = key.refreshed_at
463                     mergedkey.encr_used = key.encr_used or oldkey.encr_used
464                     mergedkey.sign_used = key.sign_used or oldkey.sign_used
465                     doc.set_json(mergedkey.get_json())
466                     d = self._soledad.put_doc(doc)
467                 else:
468                     logger.critical(
469                         "Can't put a key whith the same key_id and different "
470                         "fingerprint: %s, %s"
471                         % (key.fingerprint, oldkey.fingerprint))
472                     d = defer.fail(
473                         errors.KeyFingerprintMismatch(key.fingerprint))
474             elif len(docs) > 1:
475                 logger.critical(
476                     "There is more than one key with the same key_id %s"
477                     % (key.key_id,))
478                 d = defer.fail(errors.KeyAttributesDiffer(key.key_id))
479             else:
480                 d = self._soledad.create_doc_from_json(key.get_json())
481             return d
482
483         d = self._soledad.get_from_index(
484             TYPE_ID_PRIVATE_INDEX,
485             self.KEY_TYPE,
486             key.key_id,
487             '1' if key.private else '0')
488         d.addCallback(check_and_put, key)
489         return d
490
491     def _put_active_doc(self, key, address):
492         """
493         Put active key document in soledad
494
495         :type key: OpenPGPKey
496         :type addresses: str
497         :rtype: Deferred
498         """
499         def check_and_put(docs):
500             if len(docs) == 1:
501                 doc = docs.pop()
502                 doc.set_json(key.get_active_json(address))
503                 d = self._soledad.put_doc(doc)
504             else:
505                 if len(docs) > 1:
506                     logger.error("There is more than one active key document "
507                                  "for the address %s" % (address,))
508                     deferreds = []
509                     for doc in docs:
510                         delete = self._soledad.delete_doc(doc)
511                         deferreds.append(delete)
512                     d = defer.gatherResults(deferreds, consumeErrors=True)
513                 else:
514                     d = defer.succeed(None)
515
516                 d.addCallback(
517                     lambda _: self._soledad.create_doc_from_json(
518                         key.get_active_json(address)))
519             return d
520
521         d = self._soledad.get_from_index(
522             TYPE_ADDRESS_PRIVATE_INDEX,
523             self.ACTIVE_TYPE,
524             address,
525             '1' if key.private else '0')
526         d.addCallback(check_and_put)
527         return d
528
529     def _get_key_doc(self, address, private=False):
530         """
531         Get the document with a key (public, by default) bound to C{address}.
532
533         If C{private} is True, looks for a private key instead of a public.
534
535         :param address: The address bound to the key.
536         :type address: str
537         :param private: Whether to look for a private key.
538         :type private: bool
539
540         :return: A Deferred which fires with the SoledadDocument with the key
541                  or None if it does not exist.
542         :rtype: Deferred
543         """
544         def get_key_from_active_doc(activedoc):
545             if len(activedoc) is 0:
546                 return None
547             leap_assert(
548                 len(activedoc) is 1,
549                 'Found more than one key for address %s!' % (address,))
550
551             key_id = activedoc[0].content[KEY_ID_KEY]
552             d = self._soledad.get_from_index(
553                 TYPE_ID_PRIVATE_INDEX,
554                 self.KEY_TYPE,
555                 key_id,
556                 '1' if private else '0')
557             d.addCallback(get_doc, key_id)
558             return d
559
560         def get_doc(doclist, key_id):
561             leap_assert(
562                 len(doclist) is 1,
563                 'There is %d keys for id %s!' % (len(doclist), key_id))
564             return doclist.pop()
565
566         d = self._soledad.get_from_index(
567             TYPE_ADDRESS_PRIVATE_INDEX,
568             self.ACTIVE_TYPE,
569             address,
570             '1' if private else '0')
571         d.addCallback(get_key_from_active_doc)
572         return d
573
574     def delete_key(self, key):
575         """
576         Remove C{key} from storage.
577
578         :param key: The key to be removed.
579         :type key: EncryptionKey
580
581         :return: A Deferred which fires when the key is deleted, or which
582                  fails with KeyNotFound if the key was not found on local
583                  storage.
584         :rtype: Deferred
585         """
586         leap_assert_type(key, OpenPGPKey)
587
588         def delete_docs(activedocs):
589             deferreds = []
590             for doc in activedocs:
591                 d = self._soledad.delete_doc(doc)
592                 deferreds.append(d)
593             return defer.gatherResults(deferreds)
594
595         def get_key_docs(_):
596             return self._soledad.get_from_index(
597                 TYPE_ID_PRIVATE_INDEX,
598                 self.KEY_TYPE,
599                 key.key_id,
600                 '1' if key.private else '0')
601
602         def delete_key(docs):
603             if len(docs) == 0:
604                 raise errors.KeyNotFound(key)
605             if len(docs) > 1:
606                 logger.critical("There is more than one key for key_id %s"
607                                 % key.key_id)
608
609             doc = None
610             for d in docs:
611                 if d.content['fingerprint'] == key.fingerprint:
612                     doc = d
613                     break
614             if doc is None:
615                 raise errors.KeyNotFound(key)
616             return self._soledad.delete_doc(doc)
617
618         d = self._soledad.get_from_index(
619             TYPE_ID_PRIVATE_INDEX,
620             self.ACTIVE_TYPE,
621             key.key_id,
622             '1' if key.private else '0')
623         d.addCallback(delete_docs)
624         d.addCallback(get_key_docs)
625         d.addCallback(delete_key)
626         return d
627
628     #
629     # Data encryption, decryption, signing and verifying
630     #
631
632     def _temporary_gpgwrapper(self, keys=None):
633         """
634         Return a gpg wrapper that implements the context manager protocol and
635         contains C{keys}.
636
637         :param keys: keys to conform the keyring.
638         :type key: list(OpenPGPKey)
639
640         :return: a TempGPGWrapper instance
641         :rtype: TempGPGWrapper
642         """
643         # TODO do here checks on key_data
644         return TempGPGWrapper(
645             keys=keys, gpgbinary=self._gpgbinary)
646
647     @staticmethod
648     def _assert_gpg_result_ok(result):
649         """
650         Check if GPG result is 'ok' and log stderr outputs.
651
652         :param result: GPG results, which have a field calld 'ok' that states
653                        whether the gpg operation was successful or not.
654         :type result: object
655
656         :raise GPGError: Raised when the gpg operation was not successful.
657         """
658         stderr = getattr(result, 'stderr', None)
659         if stderr:
660             logger.debug("%s" % (stderr,))
661         if getattr(result, 'ok', None) is not True:
662             raise errors.GPGError(
663                 'Failed to encrypt/decrypt: %s' % stderr)
664
665     def encrypt(self, data, pubkey, passphrase=None, sign=None,
666                 cipher_algo='AES256'):
667         """
668         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
669
670         :param data: The data to be encrypted.
671         :type data: str
672         :param pubkey: The key used to encrypt.
673         :type pubkey: OpenPGPKey
674         :param sign: The key used for signing.
675         :type sign: OpenPGPKey
676         :param cipher_algo: The cipher algorithm to use.
677         :type cipher_algo: str
678
679         :return: The encrypted data.
680         :rtype: str
681
682         :raise EncryptError: Raised if failed encrypting for some reason.
683         """
684         leap_assert_type(pubkey, OpenPGPKey)
685         leap_assert(pubkey.private is False, 'Key is not public.')
686         keys = [pubkey]
687         if sign is not None:
688             leap_assert_type(sign, OpenPGPKey)
689             leap_assert(sign.private is True)
690             keys.append(sign)
691         with self._temporary_gpgwrapper(keys) as gpg:
692             result = gpg.encrypt(
693                 data, pubkey.fingerprint,
694                 default_key=sign.key_id if sign else None,
695                 passphrase=passphrase, symmetric=False,
696                 cipher_algo=cipher_algo)
697             # Here we cannot assert for correctness of sig because the sig is
698             # in the ciphertext.
699             # result.ok    - (bool) indicates if the operation succeeded
700             # result.data  - (bool) contains the result of the operation
701             try:
702                 self._assert_gpg_result_ok(result)
703                 return result.data
704             except errors.GPGError as e:
705                 logger.error('Failed to decrypt: %s.' % str(e))
706                 raise errors.EncryptError()
707
708     def decrypt(self, data, privkey, passphrase=None, verify=None):
709         """
710         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
711
712         :param data: The data to be decrypted.
713         :type data: str
714         :param privkey: The key used to decrypt.
715         :type privkey: OpenPGPKey
716         :param passphrase: The passphrase for the secret key used for
717                            decryption.
718         :type passphrase: str
719         :param verify: The key used to verify a signature.
720         :type verify: OpenPGPKey
721
722         :return: The decrypted data and if signature verifies
723         :rtype: (unicode, bool)
724
725         :raise DecryptError: Raised if failed decrypting for some reason.
726         """
727         leap_assert(privkey.private is True, 'Key is not private.')
728         keys = [privkey]
729         if verify is not None:
730             leap_assert_type(verify, OpenPGPKey)
731             leap_assert(verify.private is False)
732             keys.append(verify)
733         with self._temporary_gpgwrapper(keys) as gpg:
734             try:
735                 result = gpg.decrypt(
736                     data, passphrase=passphrase, always_trust=True)
737                 self._assert_gpg_result_ok(result)
738
739                 # verify signature
740                 sign_valid = False
741                 if (verify is not None and
742                         result.valid is True and
743                         verify.fingerprint == result.pubkey_fingerprint):
744                     sign_valid = True
745
746                 return (result.data, sign_valid)
747             except errors.GPGError as e:
748                 logger.error('Failed to decrypt: %s.' % str(e))
749                 raise errors.DecryptError(str(e))
750
751     def is_encrypted(self, data):
752         """
753         Return whether C{data} was asymmetrically encrypted using OpenPGP.
754
755         :param data: The data we want to know about.
756         :type data: str
757
758         :return: Whether C{data} was encrypted using this wrapper.
759         :rtype: bool
760         """
761         with self._temporary_gpgwrapper() as gpg:
762             gpgutil = GPGUtilities(gpg)
763             return gpgutil.is_encrypted_asym(data)
764
765     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
766              detach=True, binary=False):
767         """
768         Sign C{data} with C{privkey}.
769
770         :param data: The data to be signed.
771         :type data: str
772
773         :param privkey: The private key to be used to sign.
774         :type privkey: OpenPGPKey
775         :param digest_algo: The hash digest to use.
776         :type digest_algo: str
777         :param clearsign: If True, create a cleartext signature.
778         :type clearsign: bool
779         :param detach: If True, create a detached signature.
780         :type detach: bool
781         :param binary: If True, do not ascii armour the output.
782         :type binary: bool
783
784         :return: The ascii-armored signed data.
785         :rtype: str
786         """
787         leap_assert_type(privkey, OpenPGPKey)
788         leap_assert(privkey.private is True)
789
790         # result.fingerprint - contains the fingerprint of the key used to
791         #                      sign.
792         with self._temporary_gpgwrapper(privkey) as gpg:
793             result = gpg.sign(data, default_key=privkey.key_id,
794                               digest_algo=digest_algo, clearsign=clearsign,
795                               detach=detach, binary=binary)
796             rfprint = privkey.fingerprint
797             privkey = gpg.list_keys(secret=True).pop()
798             kfprint = privkey['fingerprint']
799             if result.fingerprint is None:
800                 raise errors.SignFailed(
801                     'Failed to sign with key %s: %s' %
802                     (privkey['keyid'], result.stderr))
803             leap_assert(
804                 result.fingerprint == kfprint,
805                 'Signature and private key fingerprints mismatch: '
806                 '%s != %s' % (rfprint, kfprint))
807         return result.data
808
809     def verify(self, data, pubkey, detached_sig=None):
810         """
811         Verify signed C{data} with C{pubkey}, eventually using
812         C{detached_sig}.
813
814         :param data: The data to be verified.
815         :type data: str
816         :param pubkey: The public key to be used on verification.
817         :type pubkey: OpenPGPKey
818         :param detached_sig: A detached signature. If given, C{data} is
819                              verified against this detached signature.
820         :type detached_sig: str
821
822         :return: signature matches
823         :rtype: bool
824         """
825         leap_assert_type(pubkey, OpenPGPKey)
826         leap_assert(pubkey.private is False)
827         with self._temporary_gpgwrapper(pubkey) as gpg:
828             result = None
829             if detached_sig is None:
830                 result = gpg.verify(data)
831             else:
832                 # to verify using a detached sig we have to use
833                 # gpg.verify_file(), which receives the data as a binary
834                 # stream and the name of a file containing the signature.
835                 sf, sfname = tempfile.mkstemp()
836                 with os.fdopen(sf, 'w') as sfd:
837                     sfd.write(detached_sig)
838                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
839                 os.unlink(sfname)
840             gpgpubkey = gpg.list_keys().pop()
841             valid = result.valid
842             rfprint = result.fingerprint
843             kfprint = gpgpubkey['fingerprint']
844             return valid and rfprint == kfprint