upgrade key when signed by old key
[keymanager.git] / src / leap / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 """
18 Infrastructure for using OpenPGP keys in Key Manager.
19 """
20 import logging
21 import os
22 import re
23 import shutil
24 import tempfile
25 import io
26
27
28 from datetime import datetime
29 from gnupg import GPG
30 from gnupg.gnupg import GPGUtilities
31 from twisted.internet import defer
32
33 from leap.common.check import leap_assert, leap_assert_type, leap_check
34 from leap.keymanager import errors
35 from leap.keymanager.keys import (
36     EncryptionKey,
37     EncryptionScheme,
38     is_address,
39     build_key_from_dict,
40     TYPE_ID_PRIVATE_INDEX,
41     TYPE_ADDRESS_PRIVATE_INDEX,
42     KEY_ADDRESS_KEY,
43     KEY_ID_KEY,
44     KEYMANAGER_ACTIVE_TYPE,
45 )
46
47
48 logger = logging.getLogger(__name__)
49
50
51 #
52 # A temporary GPG keyring wrapped to provide OpenPGP functionality.
53 #
54
55 class TempGPGWrapper(object):
56     """
57     A context manager that wraps a temporary GPG keyring which only contains
58     the keys given at object creation.
59     """
60
61     def __init__(self, keys=None, gpgbinary=None):
62         """
63         Create an empty temporary keyring and import any given C{keys} into
64         it.
65
66         :param keys: OpenPGP key, or list of.
67         :type keys: OpenPGPKey or list of OpenPGPKeys
68         :param gpgbinary: Name for GnuPG binary executable.
69         :type gpgbinary: C{str}
70         """
71         self._gpg = None
72         self._gpgbinary = gpgbinary
73         if not keys:
74             keys = list()
75         if not isinstance(keys, list):
76             keys = [keys]
77         self._keys = keys
78         for key in keys:
79             leap_assert_type(key, OpenPGPKey)
80
81     def __enter__(self):
82         """
83         Build and return a GPG keyring containing the keys given on
84         object creation.
85
86         :return: A GPG instance containing the keys given on object creation.
87         :rtype: gnupg.GPG
88         """
89         self._build_keyring()
90         return self._gpg
91
92     def __exit__(self, exc_type, exc_value, traceback):
93         """
94         Ensure the gpg is properly destroyed.
95         """
96         # TODO handle exceptions and log here
97         self._destroy_keyring()
98
99     def _build_keyring(self):
100         """
101         Create a GPG keyring containing the keys given on object creation.
102
103         :return: A GPG instance containing the keys given on object creation.
104         :rtype: gnupg.GPG
105         """
106         privkeys = [key for key in self._keys if key and key.private is True]
107         publkeys = [key for key in self._keys if key and key.private is False]
108         # here we filter out public keys that have a correspondent
109         # private key in the list because the private key_data by
110         # itself is enough to also have the public key in the keyring,
111         # and we want to count the keys afterwards.
112
113         privids = map(lambda privkey: privkey.key_id, privkeys)
114         publkeys = filter(
115             lambda pubkey: pubkey.key_id not in privids, publkeys)
116
117         listkeys = lambda: self._gpg.list_keys()
118         listsecretkeys = lambda: self._gpg.list_keys(secret=True)
119
120         self._gpg = GPG(binary=self._gpgbinary,
121                         homedir=tempfile.mkdtemp())
122         leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
123
124         # import keys into the keyring:
125         # concatenating ascii-armored keys, which is correctly
126         # understood by GPG.
127
128         self._gpg.import_keys("".join(
129             [x.key_data for x in publkeys + privkeys]))
130
131         # assert the number of keys in the keyring
132         leap_assert(
133             len(listkeys()) == len(publkeys) + len(privkeys),
134             'Wrong number of public keys in keyring: %d, should be %d)' %
135             (len(listkeys()), len(publkeys) + len(privkeys)))
136         leap_assert(
137             len(listsecretkeys()) == len(privkeys),
138             'Wrong number of private keys in keyring: %d, should be %d)' %
139             (len(listsecretkeys()), len(privkeys)))
140
141     def _destroy_keyring(self):
142         """
143         Securely erase the keyring.
144         """
145         # TODO: implement some kind of wiping of data or a more
146         # secure way that
147         # does not write to disk.
148
149         try:
150             for secret in [True, False]:
151                 for key in self._gpg.list_keys(secret=secret):
152                     self._gpg.delete_keys(
153                         key['fingerprint'],
154                         secret=secret)
155             leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
156
157         except:
158             raise
159
160         finally:
161             leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
162                         "watch out! Tried to remove default gnupg home!")
163             shutil.rmtree(self._gpg.homedir)
164
165
166 def _parse_address(address):
167     """
168     Remove name, '<', '>' and the identity suffix after the '+' until the '@'
169     e.g.: test_user+something@provider.com becomes test_user@provider.com
170     since the key belongs to the identity without the '+' suffix.
171
172     :type address: str
173     :rtype: str
174     """
175     mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
176     match = re.match(mail_regex, address)
177     if match is None:
178         return None
179     return ''.join(match.group(2, 4))
180
181
182 #
183 # The OpenPGP wrapper
184 #
185
186 class OpenPGPKey(EncryptionKey):
187     """
188     Base class for OpenPGP keys.
189     """
190
191     def __init__(self, address, gpgbinary=None, **kwargs):
192         self._gpgbinary = gpgbinary
193         super(OpenPGPKey, self).__init__(address, **kwargs)
194
195     @property
196     def signatures(self):
197         """
198         Get the key signatures
199
200         :return: the key IDs that have signed the key
201         :rtype: list(str)
202         """
203         with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
204             res = gpg.list_sigs(self.key_id)
205             for uid, sigs in res.sigs.iteritems():
206                 if _parse_address(uid) in self.address:
207                     return sigs
208
209         return []
210
211
212 class OpenPGPScheme(EncryptionScheme):
213     """
214     A wrapper for OpenPGP keys management and use (encryption, decyption,
215     signing and verification).
216     """
217
218     # type used on the soledad documents
219     KEY_TYPE = OpenPGPKey.__name__
220     ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
221
222     def __init__(self, soledad, gpgbinary=None):
223         """
224         Initialize the OpenPGP wrapper.
225
226         :param soledad: A Soledad instance for key storage.
227         :type soledad: leap.soledad.Soledad
228         :param gpgbinary: Name for GnuPG binary executable.
229         :type gpgbinary: C{str}
230         """
231         EncryptionScheme.__init__(self, soledad)
232         self._wait_indexes("get_key", "put_key")
233         self._gpgbinary = gpgbinary
234
235     #
236     # Keys management
237     #
238
239     def gen_key(self, address):
240         """
241         Generate an OpenPGP keypair bound to C{address}.
242
243         :param address: The address bound to the key.
244         :type address: str
245
246         :return: A Deferred which fires with the key bound to address, or fails
247                  with KeyAlreadyExists if key already exists in local database.
248         :rtype: Deferred
249         """
250         # make sure the key does not already exist
251         leap_assert(is_address(address), 'Not an user address: %s' % address)
252
253         def _gen_key(_):
254             with self._temporary_gpgwrapper() as gpg:
255                 # TODO: inspect result, or use decorator
256                 params = gpg.gen_key_input(
257                     key_type='RSA',
258                     key_length=4096,
259                     name_real=address,
260                     name_email=address,
261                     name_comment='')
262                 logger.info("About to generate keys... "
263                             "This might take SOME time.")
264                 gpg.gen_key(params)
265                 logger.info("Keys for %s have been successfully "
266                             "generated." % (address,))
267                 pubkeys = gpg.list_keys()
268
269                 # assert for new key characteristics
270                 leap_assert(
271                     len(pubkeys) is 1,  # a unitary keyring!
272                     'Keyring has wrong number of keys: %d.' % len(pubkeys))
273                 key = gpg.list_keys(secret=True).pop()
274                 leap_assert(
275                     len(key['uids']) is 1,  # with just one uid!
276                     'Wrong number of uids for key: %d.' % len(key['uids']))
277                 uid_match = False
278                 for uid in key['uids']:
279                     if re.match('.*<%s>$' % address, uid) is not None:
280                         uid_match = True
281                         break
282                 leap_assert(uid_match, 'Key not correctly bound to address.')
283
284                 # insert both public and private keys in storage
285                 deferreds = []
286                 for secret in [True, False]:
287                     key = gpg.list_keys(secret=secret).pop()
288                     openpgp_key = self._build_key_from_gpg(
289                         key,
290                         gpg.export_keys(key['fingerprint'], secret=secret))
291                     d = self.put_key(openpgp_key, address)
292                     deferreds.append(d)
293                 return defer.gatherResults(deferreds)
294
295         def key_already_exists(_):
296             raise errors.KeyAlreadyExists(address)
297
298         d = self.get_key(address)
299         d.addCallbacks(key_already_exists, _gen_key)
300         d.addCallback(lambda _: self.get_key(address, private=True))
301         return d
302
303     def get_key(self, address, private=False):
304         """
305         Get key bound to C{address} from local storage.
306
307         :param address: The address bound to the key.
308         :type address: str
309         :param private: Look for a private key instead of a public one?
310         :type private: bool
311
312         :return: A Deferred which fires with the OpenPGPKey bound to address,
313                  or which fails with KeyNotFound if the key was not found on
314                  local storage.
315         :rtype: Deferred
316         """
317         address = _parse_address(address)
318
319         def build_key(doc):
320             if doc is None:
321                 raise errors.KeyNotFound(address)
322             leap_assert(
323                 address in doc.content[KEY_ADDRESS_KEY],
324                 'Wrong address in key data.')
325             key = build_key_from_dict(OpenPGPKey, doc.content)
326             key._gpgbinary = self._gpgbinary
327             return key
328
329         d = self._get_key_doc(address, private)
330         d.addCallback(build_key)
331         return d
332
333     def parse_ascii_key(self, key_data):
334         """
335         Parses an ascii armored key (or key pair) data and returns
336         the OpenPGPKey keys.
337
338         :param key_data: the key data to be parsed.
339         :type key_data: str or unicode
340
341         :returns: the public key and private key (if applies) for that data.
342         :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
343                 the tuple may have one or both components None
344         """
345         leap_assert_type(key_data, (str, unicode))
346         # TODO: add more checks for correct key data.
347         leap_assert(key_data is not None, 'Data does not represent a key.')
348
349         with self._temporary_gpgwrapper() as gpg:
350             # TODO: inspect result, or use decorator
351             gpg.import_keys(key_data)
352             privkey = None
353             pubkey = None
354
355             try:
356                 privkey = gpg.list_keys(secret=True).pop()
357             except IndexError:
358                 pass
359             try:
360                 pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
361             except IndexError:
362                 return (None, None)
363
364             openpgp_privkey = None
365             if privkey is not None:
366                 # build private key
367                 openpgp_privkey = self._build_key_from_gpg(
368                     privkey,
369                     gpg.export_keys(privkey['fingerprint'], secret=True))
370                 leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
371                            'Fingerprints for public and private key differ.',
372                            errors.KeyFingerprintMismatch)
373
374             # build public key
375             openpgp_pubkey = self._build_key_from_gpg(
376                 pubkey,
377                 gpg.export_keys(pubkey['fingerprint'], secret=False))
378
379             return (openpgp_pubkey, openpgp_privkey)
380
381     def put_ascii_key(self, key_data, address):
382         """
383         Put key contained in ascii-armored C{key_data} in local storage.
384
385         :param key_data: The key data to be stored.
386         :type key_data: str or unicode
387         :param address: address for which this key will be active
388         :type address: str
389
390         :return: A Deferred which fires when the OpenPGPKey is in the storage.
391         :rtype: Deferred
392         """
393         leap_assert_type(key_data, (str, unicode))
394
395         openpgp_privkey = None
396         try:
397             openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
398         except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
399             return defer.fail(e)
400
401         def put_key(_, key):
402             return self.put_key(key, address)
403
404         d = defer.succeed(None)
405         if openpgp_pubkey is not None:
406             d.addCallback(put_key, openpgp_pubkey)
407         if openpgp_privkey is not None:
408             d.addCallback(put_key, openpgp_privkey)
409         return d
410
411     def put_key(self, key, address):
412         """
413         Put C{key} in local storage.
414
415         :param key: The key to be stored.
416         :type key: OpenPGPKey
417         :param address: address for which this key will be active.
418         :type address: str
419
420         :return: A Deferred which fires when the key is in the storage.
421         :rtype: Deferred
422         """
423         d = self._put_key_doc(key)
424         d.addCallback(lambda _: self._put_active_doc(key, address))
425         return d
426
427     def _put_key_doc(self, key):
428         """
429         Put key document in soledad
430
431         :type key: OpenPGPKey
432         :rtype: Deferred
433         """
434         def check_and_put(docs, key):
435             if len(docs) == 1:
436                 doc = docs.pop()
437                 oldkey = build_key_from_dict(OpenPGPKey, doc.content)
438                 if key.fingerprint == oldkey.fingerprint:
439                     # in case of an update of the key merge them with gnupg
440                     with self._temporary_gpgwrapper() as gpg:
441                         gpg.import_keys(oldkey.key_data)
442                         gpg.import_keys(key.key_data)
443                         gpgkey = gpg.list_keys(secret=key.private).pop()
444                         mergedkey = self._build_key_from_gpg(
445                             gpgkey,
446                             gpg.export_keys(gpgkey['fingerprint'],
447                                             secret=key.private))
448                     mergedkey.validation = max(
449                         [key.validation, oldkey.validation])
450                     mergedkey.last_audited_at = oldkey.last_audited_at
451                     mergedkey.refreshed_at = key.refreshed_at
452                     mergedkey.encr_used = key.encr_used or oldkey.encr_used
453                     mergedkey.sign_used = key.sign_used or oldkey.sign_used
454                     doc.set_json(mergedkey.get_json())
455                     d = self._soledad.put_doc(doc)
456                 else:
457                     logger.critical(
458                         "Can't put a key whith the same key_id and different "
459                         "fingerprint: %s, %s"
460                         % (key.fingerprint, oldkey.fingerprint))
461                     d = defer.fail(
462                         errors.KeyFingerprintMismatch(key.fingerprint))
463             elif len(docs) > 1:
464                 logger.critical(
465                     "There is more than one key with the same key_id %s"
466                     % (key.key_id,))
467                 d = defer.fail(errors.KeyAttributesDiffer(key.key_id))
468             else:
469                 d = self._soledad.create_doc_from_json(key.get_json())
470             return d
471
472         d = self._soledad.get_from_index(
473             TYPE_ID_PRIVATE_INDEX,
474             self.KEY_TYPE,
475             key.key_id,
476             '1' if key.private else '0')
477         d.addCallback(check_and_put, key)
478         return d
479
480     def _put_active_doc(self, key, address):
481         """
482         Put active key document in soledad
483
484         :type key: OpenPGPKey
485         :type addresses: str
486         :rtype: Deferred
487         """
488         def check_and_put(docs):
489             if len(docs) == 1:
490                 doc = docs.pop()
491                 doc.set_json(key.get_active_json(address))
492                 d = self._soledad.put_doc(doc)
493             else:
494                 if len(docs) > 1:
495                     logger.error("There is more than one active key document "
496                                  "for the address %s" % (address,))
497                     deferreds = []
498                     for doc in docs:
499                         delete = self._soledad.delete_doc(doc)
500                         deferreds.append(delete)
501                     d = defer.gatherResults(deferreds, consumeErrors=True)
502                 else:
503                     d = defer.succeed(None)
504
505                 d.addCallback(
506                     lambda _: self._soledad.create_doc_from_json(
507                         key.get_active_json(address)))
508             return d
509
510         d = self._soledad.get_from_index(
511             TYPE_ADDRESS_PRIVATE_INDEX,
512             self.ACTIVE_TYPE,
513             address,
514             '1' if key.private else '0')
515         d.addCallback(check_and_put)
516         return d
517
518     def _get_key_doc(self, address, private=False):
519         """
520         Get the document with a key (public, by default) bound to C{address}.
521
522         If C{private} is True, looks for a private key instead of a public.
523
524         :param address: The address bound to the key.
525         :type address: str
526         :param private: Whether to look for a private key.
527         :type private: bool
528
529         :return: A Deferred which fires with the SoledadDocument with the key
530                  or None if it does not exist.
531         :rtype: Deferred
532         """
533         def get_key_from_active_doc(activedoc):
534             if len(activedoc) is 0:
535                 return None
536             leap_assert(
537                 len(activedoc) is 1,
538                 'Found more than one key for address %s!' % (address,))
539
540             key_id = activedoc[0].content[KEY_ID_KEY]
541             d = self._soledad.get_from_index(
542                 TYPE_ID_PRIVATE_INDEX,
543                 self.KEY_TYPE,
544                 key_id,
545                 '1' if private else '0')
546             d.addCallback(get_doc, key_id)
547             return d
548
549         def get_doc(doclist, key_id):
550             leap_assert(
551                 len(doclist) is 1,
552                 'There is %d keys for id %s!' % (len(doclist), key_id))
553             return doclist.pop()
554
555         d = self._soledad.get_from_index(
556             TYPE_ADDRESS_PRIVATE_INDEX,
557             self.ACTIVE_TYPE,
558             address,
559             '1' if private else '0')
560         d.addCallback(get_key_from_active_doc)
561         return d
562
563     def _build_key_from_gpg(self, key, key_data):
564         """
565         Build an OpenPGPKey for C{address} based on C{key} from
566         local gpg storage.
567
568         ASCII armored GPG key data has to be queried independently in this
569         wrapper, so we receive it in C{key_data}.
570
571         :param key: Key obtained from GPG storage.
572         :type key: dict
573         :param key_data: Key data obtained from GPG storage.
574         :type key_data: str
575         :return: An instance of the key.
576         :rtype: OpenPGPKey
577         """
578         expiry_date = None
579         if key['expires']:
580             expiry_date = datetime.fromtimestamp(int(key['expires']))
581         address = []
582         for uid in key['uids']:
583             address.append(_parse_address(uid))
584
585         return OpenPGPKey(
586             address,
587             gpgbinary=self._gpgbinary,
588             key_id=key['keyid'],
589             fingerprint=key['fingerprint'],
590             key_data=key_data,
591             private=True if key['type'] == 'sec' else False,
592             length=int(key['length']),
593             expiry_date=expiry_date,
594             refreshed_at=datetime.now(),
595         )
596
597     def delete_key(self, key):
598         """
599         Remove C{key} from storage.
600
601         :param key: The key to be removed.
602         :type key: EncryptionKey
603
604         :return: A Deferred which fires when the key is deleted, or which
605                  fails with KeyNotFound if the key was not found on local
606                  storage.
607         :rtype: Deferred
608         """
609         leap_assert_type(key, OpenPGPKey)
610
611         def delete_docs(activedocs):
612             deferreds = []
613             for doc in activedocs:
614                 d = self._soledad.delete_doc(doc)
615                 deferreds.append(d)
616             return defer.gatherResults(deferreds)
617
618         def get_key_docs(_):
619             return self._soledad.get_from_index(
620                 TYPE_ID_PRIVATE_INDEX,
621                 self.KEY_TYPE,
622                 key.key_id,
623                 '1' if key.private else '0')
624
625         def delete_key(docs):
626             if len(docs) == 0:
627                 raise errors.KeyNotFound(key)
628             if len(docs) > 1:
629                 logger.critical("There is more than one key for key_id %s"
630                                 % key.key_id)
631
632             doc = None
633             for d in docs:
634                 if d.content['fingerprint'] == key.fingerprint:
635                     doc = d
636                     break
637             if doc is None:
638                 raise errors.KeyNotFound(key)
639             return self._soledad.delete_doc(doc)
640
641         d = self._soledad.get_from_index(
642             TYPE_ID_PRIVATE_INDEX,
643             self.ACTIVE_TYPE,
644             key.key_id,
645             '1' if key.private else '0')
646         d.addCallback(delete_docs)
647         d.addCallback(get_key_docs)
648         d.addCallback(delete_key)
649         return d
650
651     #
652     # Data encryption, decryption, signing and verifying
653     #
654
655     def _temporary_gpgwrapper(self, keys=None):
656         """
657         Return a gpg wrapper that implements the context manager protocol and
658         contains C{keys}.
659
660         :param keys: keys to conform the keyring.
661         :type key: list(OpenPGPKey)
662
663         :return: a TempGPGWrapper instance
664         :rtype: TempGPGWrapper
665         """
666         # TODO do here checks on key_data
667         return TempGPGWrapper(
668             keys=keys, gpgbinary=self._gpgbinary)
669
670     @staticmethod
671     def _assert_gpg_result_ok(result):
672         """
673         Check if GPG result is 'ok' and log stderr outputs.
674
675         :param result: GPG results, which have a field calld 'ok' that states
676                        whether the gpg operation was successful or not.
677         :type result: object
678
679         :raise GPGError: Raised when the gpg operation was not successful.
680         """
681         stderr = getattr(result, 'stderr', None)
682         if stderr:
683             logger.debug("%s" % (stderr,))
684         if getattr(result, 'ok', None) is not True:
685             raise errors.GPGError(
686                 'Failed to encrypt/decrypt: %s' % stderr)
687
688     def encrypt(self, data, pubkey, passphrase=None, sign=None,
689                 cipher_algo='AES256'):
690         """
691         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
692
693         :param data: The data to be encrypted.
694         :type data: str
695         :param pubkey: The key used to encrypt.
696         :type pubkey: OpenPGPKey
697         :param sign: The key used for signing.
698         :type sign: OpenPGPKey
699         :param cipher_algo: The cipher algorithm to use.
700         :type cipher_algo: str
701
702         :return: The encrypted data.
703         :rtype: str
704
705         :raise EncryptError: Raised if failed encrypting for some reason.
706         """
707         leap_assert_type(pubkey, OpenPGPKey)
708         leap_assert(pubkey.private is False, 'Key is not public.')
709         keys = [pubkey]
710         if sign is not None:
711             leap_assert_type(sign, OpenPGPKey)
712             leap_assert(sign.private is True)
713             keys.append(sign)
714         with self._temporary_gpgwrapper(keys) as gpg:
715             result = gpg.encrypt(
716                 data, pubkey.fingerprint,
717                 default_key=sign.key_id if sign else None,
718                 passphrase=passphrase, symmetric=False,
719                 cipher_algo=cipher_algo)
720             # Here we cannot assert for correctness of sig because the sig is
721             # in the ciphertext.
722             # result.ok    - (bool) indicates if the operation succeeded
723             # result.data  - (bool) contains the result of the operation
724             try:
725                 self._assert_gpg_result_ok(result)
726                 return result.data
727             except errors.GPGError as e:
728                 logger.error('Failed to decrypt: %s.' % str(e))
729                 raise errors.EncryptError()
730
731     def decrypt(self, data, privkey, passphrase=None, verify=None):
732         """
733         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
734
735         :param data: The data to be decrypted.
736         :type data: str
737         :param privkey: The key used to decrypt.
738         :type privkey: OpenPGPKey
739         :param passphrase: The passphrase for the secret key used for
740                            decryption.
741         :type passphrase: str
742         :param verify: The key used to verify a signature.
743         :type verify: OpenPGPKey
744
745         :return: The decrypted data and if signature verifies
746         :rtype: (unicode, bool)
747
748         :raise DecryptError: Raised if failed decrypting for some reason.
749         """
750         leap_assert(privkey.private is True, 'Key is not private.')
751         keys = [privkey]
752         if verify is not None:
753             leap_assert_type(verify, OpenPGPKey)
754             leap_assert(verify.private is False)
755             keys.append(verify)
756         with self._temporary_gpgwrapper(keys) as gpg:
757             try:
758                 result = gpg.decrypt(
759                     data, passphrase=passphrase, always_trust=True)
760                 self._assert_gpg_result_ok(result)
761
762                 # verify signature
763                 sign_valid = False
764                 if (verify is not None and
765                         result.valid is True and
766                         verify.fingerprint == result.pubkey_fingerprint):
767                     sign_valid = True
768
769                 return (result.data, sign_valid)
770             except errors.GPGError as e:
771                 logger.error('Failed to decrypt: %s.' % str(e))
772                 raise errors.DecryptError(str(e))
773
774     def is_encrypted(self, data):
775         """
776         Return whether C{data} was asymmetrically encrypted using OpenPGP.
777
778         :param data: The data we want to know about.
779         :type data: str
780
781         :return: Whether C{data} was encrypted using this wrapper.
782         :rtype: bool
783         """
784         with self._temporary_gpgwrapper() as gpg:
785             gpgutil = GPGUtilities(gpg)
786             return gpgutil.is_encrypted_asym(data)
787
788     def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
789              detach=True, binary=False):
790         """
791         Sign C{data} with C{privkey}.
792
793         :param data: The data to be signed.
794         :type data: str
795
796         :param privkey: The private key to be used to sign.
797         :type privkey: OpenPGPKey
798         :param digest_algo: The hash digest to use.
799         :type digest_algo: str
800         :param clearsign: If True, create a cleartext signature.
801         :type clearsign: bool
802         :param detach: If True, create a detached signature.
803         :type detach: bool
804         :param binary: If True, do not ascii armour the output.
805         :type binary: bool
806
807         :return: The ascii-armored signed data.
808         :rtype: str
809         """
810         leap_assert_type(privkey, OpenPGPKey)
811         leap_assert(privkey.private is True)
812
813         # result.fingerprint - contains the fingerprint of the key used to
814         #                      sign.
815         with self._temporary_gpgwrapper(privkey) as gpg:
816             result = gpg.sign(data, default_key=privkey.key_id,
817                               digest_algo=digest_algo, clearsign=clearsign,
818                               detach=detach, binary=binary)
819             rfprint = privkey.fingerprint
820             privkey = gpg.list_keys(secret=True).pop()
821             kfprint = privkey['fingerprint']
822             if result.fingerprint is None:
823                 raise errors.SignFailed(
824                     'Failed to sign with key %s: %s' %
825                     (privkey['keyid'], result.stderr))
826             leap_assert(
827                 result.fingerprint == kfprint,
828                 'Signature and private key fingerprints mismatch: '
829                 '%s != %s' % (rfprint, kfprint))
830         return result.data
831
832     def verify(self, data, pubkey, detached_sig=None):
833         """
834         Verify signed C{data} with C{pubkey}, eventually using
835         C{detached_sig}.
836
837         :param data: The data to be verified.
838         :type data: str
839         :param pubkey: The public key to be used on verification.
840         :type pubkey: OpenPGPKey
841         :param detached_sig: A detached signature. If given, C{data} is
842                              verified against this detached signature.
843         :type detached_sig: str
844
845         :return: signature matches
846         :rtype: bool
847         """
848         leap_assert_type(pubkey, OpenPGPKey)
849         leap_assert(pubkey.private is False)
850         with self._temporary_gpgwrapper(pubkey) as gpg:
851             result = None
852             if detached_sig is None:
853                 result = gpg.verify(data)
854             else:
855                 # to verify using a detached sig we have to use
856                 # gpg.verify_file(), which receives the data as a binary
857                 # stream and the name of a file containing the signature.
858                 sf, sfname = tempfile.mkstemp()
859                 with os.fdopen(sf, 'w') as sfd:
860                     sfd.write(detached_sig)
861                 result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
862                 os.unlink(sfname)
863             gpgpubkey = gpg.list_keys().pop()
864             valid = result.valid
865             rfprint = result.fingerprint
866             kfprint = gpgpubkey['fingerprint']
867             return valid and rfprint == kfprint