d57863888efd8995b7cce2535dc16cb9406c30a4
[leap_pycommon.git] / src / leap / common / keymanager / openpgp.py
1 # -*- coding: utf-8 -*-
2 # openpgp.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Infrastructure for using OpenPGP keys in Key Manager.
21 """
22
23
24 import re
25 import tempfile
26 import shutil
27
28 from leap.common.check import leap_assert, leap_assert_type
29 from leap.common.keymanager.errors import (
30     KeyNotFound,
31     KeyAlreadyExists,
32     KeyAttributesDiffer,
33     InvalidSignature,
34     EncryptionFailed,
35     DecryptionFailed,
36     SignFailed,
37 )
38 from leap.common.keymanager.keys import (
39     EncryptionKey,
40     EncryptionScheme,
41     is_address,
42     keymanager_doc_id,
43     build_key_from_dict,
44 )
45 from leap.common.keymanager.gpg import GPGWrapper
46
47
48 #
49 # API functions
50 #
51
52 def encrypt_sym(data, passphrase, sign=None):
53     """
54     Encrypt C{data} with C{passphrase} and sign with C{sign} private key.
55
56     @param data: The data to be encrypted.
57     @type data: str
58     @param passphrase: The passphrase used to encrypt C{data}.
59     @type passphrase: str
60     @param sign: The private key used for signing.
61     @type sign: OpenPGPKey
62
63     @return: The encrypted data.
64     @rtype: str
65     """
66     leap_assert_type(passphrase, str)
67     if sign is not None:
68         leap_assert_type(sign, OpenPGPKey)
69         leap_assert(sign.private is True)
70
71     def _encrypt_cb(gpg):
72         result = gpg.encrypt(
73             data, None,
74             sign=sign.key_id if sign else None,
75             passphrase=passphrase, symmetric=True)
76         # Here we cannot assert for correctness of sig because the sig is in
77         # the ciphertext.
78         # result.ok    - (bool) indicates if the operation succeeded
79         # result.data  - (bool) contains the result of the operation
80         if result.ok is False:
81             raise EncryptionFailed('Failed to encrypt: %s' % result.stderr)
82         return result.data
83
84     return _safe_call(_encrypt_cb, [sign])
85
86
87 def decrypt_sym(data, passphrase, verify=None):
88     """
89     Decrypt C{data} with C{passphrase} and verify with C{verify} public
90     key.
91
92     @param data: The data to be decrypted.
93     @type data: str
94     @param passphrase: The passphrase used to decrypt C{data}.
95     @type passphrase: str
96     @param verify: The key used to verify a signature.
97     @type verify: OpenPGPKey
98
99     @return: The decrypted data.
100     @rtype: str
101
102     @raise InvalidSignature: Raised if unable to verify the signature with
103         C{verify} key.
104     """
105     leap_assert_type(passphrase, str)
106     if verify is not None:
107         leap_assert_type(verify, OpenPGPKey)
108         leap_assert(verify.private is False)
109
110     def _decrypt_cb(gpg):
111         result = gpg.decrypt(data, passphrase=passphrase)
112         # result.ok    - (bool) indicates if the operation succeeded
113         # result.valid - (bool) indicates if the signature was verified
114         # result.data  - (bool) contains the result of the operation
115         # result.pubkey_fingerpring  - (str) contains the fingerprint of the
116         #                              public key that signed this data.
117         if result.ok is False:
118             raise DecryptionFailed('Failed to decrypt: %s', result.stderr)
119         if verify is not None:
120             if result.valid is False or \
121                     verify.fingerprint != result.pubkey_fingerprint:
122                 raise InvalidSignature(
123                     'Failed to verify signature with key %s: %s' %
124                     (verify.key_id, result.stderr))
125         return result.data
126
127     return _safe_call(_decrypt_cb, [verify])
128
129
130 def encrypt_asym(data, pubkey, sign=None):
131     """
132     Encrypt C{data} using public @{key} and sign with C{sign} key.
133
134     @param data: The data to be encrypted.
135     @type data: str
136     @param pubkey: The key used to encrypt.
137     @type pubkey: OpenPGPKey
138     @param sign: The key used for signing.
139     @type sign: OpenPGPKey
140
141     @return: The encrypted data.
142     @rtype: str
143     """
144     leap_assert_type(pubkey, OpenPGPKey)
145     leap_assert(pubkey.private is False, 'Key is not public.')
146     if sign is not None:
147         leap_assert_type(sign, OpenPGPKey)
148         leap_assert(sign.private is True)
149
150     def _encrypt_cb(gpg):
151         result = gpg.encrypt(
152             data, pubkey.fingerprint,
153             sign=sign.key_id if sign else None,
154             symmetric=False)
155         # Here we cannot assert for correctness of sig because the sig is in
156         # the ciphertext.
157         # result.ok    - (bool) indicates if the operation succeeded
158         # result.data  - (bool) contains the result of the operation
159         if result.ok is False:
160             raise EncryptionFailed(
161                 'Failed to encrypt with key %s: %s' %
162                 (pubkey.key_id, result.stderr))
163         return result.data
164
165     return _safe_call(_encrypt_cb, [pubkey, sign])
166
167
168 def decrypt_asym(data, privkey, verify=None):
169     """
170     Decrypt C{data} using private @{key} and verify with C{verify} key.
171
172     @param data: The data to be decrypted.
173     @type data: str
174     @param privkey: The key used to decrypt.
175     @type privkey: OpenPGPKey
176     @param verify: The key used to verify a signature.
177     @type verify: OpenPGPKey
178
179     @return: The decrypted data.
180     @rtype: str
181
182     @raise InvalidSignature: Raised if unable to verify the signature with
183         C{verify} key.
184     """
185     leap_assert(privkey.private is True, 'Key is not private.')
186     if verify is not None:
187         leap_assert_type(verify, OpenPGPKey)
188         leap_assert(verify.private is False)
189
190     def _decrypt_cb(gpg):
191         result = gpg.decrypt(data)
192         # result.ok    - (bool) indicates if the operation succeeded
193         # result.valid - (bool) indicates if the signature was verified
194         # result.data  - (bool) contains the result of the operation
195         # result.pubkey_fingerpring  - (str) contains the fingerprint of the
196         #                              public key that signed this data.
197         if result.ok is False:
198             raise DecryptionFailed('Failed to decrypt with key %s: %s' %
199                                    (privkey.key_id, result.stderr))
200         if verify is not None:
201             if result.valid is False or \
202                     verify.fingerprint != result.pubkey_fingerprint:
203                 raise InvalidSignature(
204                     'Failed to verify signature with key %s: %s' %
205                     (verify.key_id, result.stderr))
206         return result.data
207
208     return _safe_call(_decrypt_cb, [privkey, verify])
209
210
211 def is_encrypted(data):
212     """
213     Return whether C{data} was encrypted using OpenPGP.
214
215     @param data: The data we want to know about.
216     @type data: str
217
218     @return: Whether C{data} was encrypted using this wrapper.
219     @rtype: bool
220     """
221
222     def _is_encrypted_cb(gpg):
223         return gpg.is_encrypted(data)
224
225     return _safe_call(_is_encrypted_cb)
226
227
228 def is_encrypted_sym(data):
229     """
230     Return whether C{data} was encrypted using a public OpenPGP key.
231
232     @param data: The data we want to know about.
233     @type data: str
234
235     @return: Whether C{data} was encrypted using this wrapper.
236     @rtype: bool
237     """
238
239     def _is_encrypted_cb(gpg):
240         return gpg.is_encrypted_sym(data)
241
242     return _safe_call(_is_encrypted_cb)
243
244
245 def is_encrypted_asym(data):
246     """
247     Return whether C{data} was asymmetrically encrypted using OpenPGP.
248
249     @param data: The data we want to know about.
250     @type data: str
251
252     @return: Whether C{data} was encrypted using this wrapper.
253     @rtype: bool
254     """
255
256     def _is_encrypted_cb(gpg):
257         return gpg.is_encrypted_asym(data)
258
259     return _safe_call(_is_encrypted_cb)
260
261
262 def sign(data, privkey):
263     """
264     Sign C{data} with C{privkey}.
265
266     @param data: The data to be signed.
267     @type data: str
268     @param privkey: The private key to be used to sign.
269     @type privkey: OpenPGPKey
270
271     @return: The ascii-armored signed data.
272     @rtype: str
273     """
274     leap_assert_type(privkey, OpenPGPKey)
275     leap_assert(privkey.private is True)
276
277     def _sign_cb(gpg):
278         result = gpg.sign(data, keyid=privkey.key_id)
279         # result.fingerprint - contains the fingerprint of the key used to
280         #                      sign.
281         if result.fingerprint is None:
282             raise SignFailed(
283                 'Failed to sign with key %s: %s' %
284                 (privkey.key_id, result.stderr))
285         leap_assert(
286             result.fingerprint == privkey.fingerprint,
287             'Signature and private key fingerprints mismatch: %s != %s' %
288             (result.fingerprint, privkey.fingerprint))
289         return result.data
290
291     return _safe_call(_sign_cb, [privkey])
292
293
294 def verify(data, pubkey):
295     """
296     Verify signed C{data} with C{pubkey}.
297
298     @param data: The data to be verified.
299     @type data: str
300     @param pubkey: The public key to be used on verification.
301     @type pubkey: OpenPGPKey
302
303     @return: The ascii-armored signed data.
304     @rtype: str
305     """
306     leap_assert_type(pubkey, OpenPGPKey)
307     leap_assert(pubkey.private is False)
308
309     def _verify_cb(gpg):
310         result = gpg.verify(data)
311         if result.valid is False or \
312                 result.fingerprint != pubkey.fingerprint:
313             raise InvalidSignature(
314                 'Failed to verify signature with key %s.' % pubkey.key_id)
315         return True
316
317     return _safe_call(_verify_cb, [pubkey])
318
319
320 #
321 # Helper functions
322 #
323
324 def _build_key_from_gpg(address, key, key_data):
325     """
326     Build an OpenPGPKey for C{address} based on C{key} from
327     local gpg storage.
328
329     ASCII armored GPG key data has to be queried independently in this
330     wrapper, so we receive it in C{key_data}.
331
332     @param address: The address bound to the key.
333     @type address: str
334     @param key: Key obtained from GPG storage.
335     @type key: dict
336     @param key_data: Key data obtained from GPG storage.
337     @type key_data: str
338     @return: An instance of the key.
339     @rtype: OpenPGPKey
340     """
341     return OpenPGPKey(
342         address,
343         key_id=key['keyid'],
344         fingerprint=key['fingerprint'],
345         key_data=key_data,
346         private=True if key['type'] == 'sec' else False,
347         length=key['length'],
348         expiry_date=key['expires'],
349         validation=None,  # TODO: verify for validation.
350     )
351
352
353 def _build_keyring(keys=[]):
354     """
355
356     Create an empty GPG keyring and import C{keys} into it.
357
358     @param keys: List of keys to add to the keyring.
359     @type keys: list of OpenPGPKey
360
361     @return: A GPG wrapper with a unitary keyring.
362     @rtype: gnupg.GPG
363     """
364     privkeys = filter(lambda key: key.private is True, keys)
365     pubkeys = filter(lambda key: key.private is False, keys)
366     # here we filter out public keys that have a correspondent private key in
367     # the list because the private key_data by itself is enough to also have
368     # the public key in the keyring, and we want to count the keys afterwards.
369     privaddrs = map(lambda privkey: privkey.address, privkeys)
370     pubkeys = filter(lambda pubkey: pubkey.address not in privaddrs, pubkeys)
371     # create temporary dir for temporary gpg keyring
372     tmpdir = tempfile.mkdtemp()
373     gpg = GPGWrapper(gnupghome=tmpdir)
374     leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty.')
375     # import keys into the keyring
376     gpg.import_keys(
377         reduce(
378             lambda x, y: x+y,
379             [key.key_data for key in pubkeys+privkeys], ''))
380     # assert the number of keys in the keyring
381     leap_assert(
382         len(gpg.list_keys()) == len(pubkeys)+len(privkeys),
383         'Wrong number of public keys in keyring: %d, should be %d)' %
384         (len(gpg.list_keys()), len(pubkeys)+len(privkeys)))
385     leap_assert(
386         len(gpg.list_keys(secret=True)) == len(privkeys),
387         'Wrong number of private keys in keyring: %d, should be %d)' %
388         (len(gpg.list_keys(secret=True)), len(privkeys)))
389     return gpg
390
391
392 def _destroy_keyring(gpg):
393     """
394     Securely erase a keyring.
395
396     @param gpg: A GPG wrapper instance.
397     @type gpg: gnupg.GPG
398     """
399     for secret in [True, False]:
400         for key in gpg.list_keys(secret=secret):
401             gpg.delete_keys(
402                 key['fingerprint'],
403                 secret=secret)
404     leap_assert(len(gpg.list_keys()) is 0, 'Keyring not empty!')
405     # TODO: implement some kind of wiping of data or a more secure way that
406     # does not write to disk.
407     shutil.rmtree(gpg.gnupghome)
408
409
410 def _safe_call(callback, keys=[]):
411     """
412     Run C{callback} over a keyring containing C{keys}.
413
414     @param callback: Function whose first argument is the gpg keyring.
415     @type callback: function(gnupg.GPG)
416     @param keys: List of keys to add to the keyring.
417     @type keys: list of OpenPGPKey
418
419     @return: The results of the callback.
420     @rtype: str or bool
421     """
422     gpg = _build_keyring(filter(lambda key: key is not None, keys))
423     val = callback(gpg)
424     _destroy_keyring(gpg)
425     return val
426
427
428 #
429 # The OpenPGP wrapper
430 #
431
432 class OpenPGPKey(EncryptionKey):
433     """
434     Base class for OpenPGP keys.
435     """
436
437
438 class OpenPGPScheme(EncryptionScheme):
439     """
440     A wrapper for OpenPGP keys.
441     """
442
443     def __init__(self, soledad):
444         """
445         Initialize the OpenPGP wrapper.
446
447         @param soledad: A Soledad instance for key storage.
448         @type soledad: leap.soledad.Soledad
449         """
450         EncryptionScheme.__init__(self, soledad)
451
452     def gen_key(self, address):
453         """
454         Generate an OpenPGP keypair bound to C{address}.
455
456         @param address: The address bound to the key.
457         @type address: str
458         @return: The key bound to C{address}.
459         @rtype: OpenPGPKey
460         @raise KeyAlreadyExists: If key already exists in local database.
461         """
462         # make sure the key does not already exist
463         leap_assert(is_address(address), 'Not an user address: %s' % address)
464         try:
465             self.get_key(address)
466             raise KeyAlreadyExists(address)
467         except KeyNotFound:
468             pass
469
470         def _gen_key_cb(gpg):
471             params = gpg.gen_key_input(
472                 key_type='RSA',
473                 key_length=4096,
474                 name_real=address,
475                 name_email=address,
476                 name_comment='Generated by LEAP Key Manager.')
477             gpg.gen_key(params)
478             pubkeys = gpg.list_keys()
479             # assert for new key characteristics
480             leap_assert(
481                 len(pubkeys) is 1,  # a unitary keyring!
482                 'Keyring has wrong number of keys: %d.' % len(pubkeys))
483             key = gpg.list_keys(secret=True).pop()
484             leap_assert(
485                 len(key['uids']) is 1,  # with just one uid!
486                 'Wrong number of uids for key: %d.' % len(key['uids']))
487             leap_assert(
488                 re.match('.*<%s>$' % address, key['uids'][0]) is not None,
489                 'Key not correctly bound to address.')
490             # insert both public and private keys in storage
491             for secret in [True, False]:
492                 key = gpg.list_keys(secret=secret).pop()
493                 openpgp_key = _build_key_from_gpg(
494                     address, key,
495                     gpg.export_keys(key['fingerprint'], secret=secret))
496                 self.put_key(openpgp_key)
497
498         _safe_call(_gen_key_cb)
499         return self.get_key(address, private=True)
500
501     def get_key(self, address, private=False):
502         """
503         Get key bound to C{address} from local storage.
504
505         @param address: The address bound to the key.
506         @type address: str
507         @param private: Look for a private key instead of a public one?
508         @type private: bool
509
510         @return: The key bound to C{address}.
511         @rtype: OpenPGPKey
512         @raise KeyNotFound: If the key was not found on local storage.
513         """
514         leap_assert(is_address(address), 'Not an user address: %s' % address)
515         doc = self._get_key_doc(address, private)
516         if doc is None:
517             raise KeyNotFound(address)
518         return build_key_from_dict(OpenPGPKey, address, doc.content)
519
520     def put_ascii_key(self, key_data):
521         """
522         Put key contained in ascii-armored C{key_data} in local storage.
523
524         @param key_data: The key data to be stored.
525         @type key_data: str
526         """
527         leap_assert_type(key_data, str)
528
529         def _put_ascii_key_cb(gpg):
530             gpg.import_keys(key_data)
531             privkey = None
532             pubkey = None
533             try:
534                 privkey = gpg.list_keys(secret=True).pop()
535             except IndexError:
536                 pass
537             pubkey = gpg.list_keys(secret=False).pop()  # unitary keyring
538             # extract adress from first uid on key
539             match = re.match('.*<([\w.-]+@[\w.-]+)>.*', pubkey['uids'].pop())
540             leap_assert(match is not None, 'No user address in key data.')
541             address = match.group(1)
542             if privkey is not None:
543                 match = re.match(
544                     '.*<([\w.-]+@[\w.-]+)>.*', privkey['uids'].pop())
545                 leap_assert(match is not None, 'No user address in key data.')
546                 privaddress = match.group(1)
547                 leap_assert(
548                     address == privaddress,
549                     'Addresses in pub and priv key differ.')
550                 leap_assert(
551                     pubkey['fingerprint'] == privkey['fingerprint'],
552                     'Fingerprints for pub and priv key differ.')
553                 # insert private key in storage
554                 openpgp_privkey = _build_key_from_gpg(
555                     address, privkey,
556                     gpg.export_keys(privkey['fingerprint'], secret=True))
557                 self.put_key(openpgp_privkey)
558             # insert public key in storage
559             openpgp_pubkey = _build_key_from_gpg(
560                 address, pubkey,
561                 gpg.export_keys(pubkey['fingerprint'], secret=False))
562             self.put_key(openpgp_pubkey)
563
564         _safe_call(_put_ascii_key_cb)
565
566     def put_key(self, key):
567         """
568         Put C{key} in local storage.
569
570         @param key: The key to be stored.
571         @type key: OpenPGPKey
572         """
573         doc = self._get_key_doc(key.address, private=key.private)
574         if doc is None:
575             self._soledad.create_doc_from_json(
576                 key.get_json(),
577                 doc_id=keymanager_doc_id(
578                     OpenPGPKey, key.address, key.private))
579         else:
580             doc.set_json(key.get_json())
581             self._soledad.put_doc(doc)
582
583     def _get_key_doc(self, address, private=False):
584         """
585         Get the document with a key (public, by default) bound to C{address}.
586
587         If C{private} is True, looks for a private key instead of a public.
588
589         @param address: The address bound to the key.
590         @type address: str
591         @param private: Whether to look for a private key.
592         @type private: bool
593         @return: The document with the key or None if it does not exist.
594         @rtype: leap.soledad.backends.leap_backend.LeapDocument
595         """
596         return self._soledad.get_doc(
597             keymanager_doc_id(OpenPGPKey, address, private))
598
599     def delete_key(self, key):
600         """
601         Remove C{key} from storage.
602
603         @param key: The key to be removed.
604         @type key: EncryptionKey
605         """
606         leap_assert(key.__class__ is OpenPGPKey, 'Wrong key type.')
607         stored_key = self.get_key(key.address, private=key.private)
608         if stored_key is None:
609             raise KeyNotFound(key)
610         if stored_key.__dict__ != key.__dict__:
611             raise KeyAttributesDiffer(key)
612         doc = self._soledad.get_doc(
613             keymanager_doc_id(OpenPGPKey, key.address, key.private))
614         self._soledad.delete_doc(doc)