15c1d9f699d22ecbf62ca41318806d9e299a3501
[keymanager.git] / src / leap / keymanager / gpg.py
1 # -*- coding: utf-8 -*-
2 # gpgwrapper.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 A GPG wrapper used to handle OpenPGP keys.
21
22 This is a temporary class that will be superseded by the a revised version of
23 python-gnupg.
24 """
25
26
27 import os
28 import gnupg
29 import re
30 from gnupg import (
31     logger,
32     _is_sequence,
33     _make_binary_stream,
34 )
35
36
37 class ListPackets():
38     """
39     Handle status messages for --list-packets.
40     """
41
42     def __init__(self, gpg):
43         """
44         Initialize the packet listing handling class.
45
46         :param gpg: GPG object instance.
47         :type gpg: gnupg.GPG
48         """
49         self.gpg = gpg
50         self.nodata = None
51         self.key = None
52         self.need_passphrase = None
53         self.need_passphrase_sym = None
54         self.userid_hint = None
55
56     def handle_status(self, key, value):
57         """
58         Handle one line of the --list-packets status message.
59
60         :param key: The status message key.
61         :type key: str
62         :param value: The status message value.
63         :type value: str
64         """
65         # TODO: write tests for handle_status
66         if key == 'NODATA':
67             self.nodata = True
68         if key == 'ENC_TO':
69             # This will only capture keys in our keyring. In the future we
70             # may want to include multiple unknown keys in this list.
71             self.key, _, _ = value.split()
72         if key == 'NEED_PASSPHRASE':
73             self.need_passphrase = True
74         if key == 'NEED_PASSPHRASE_SYM':
75             self.need_passphrase_sym = True
76         if key == 'USERID_HINT':
77             self.userid_hint = value.strip().split()
78
79
80 class GPGWrapper(gnupg.GPG):
81     """
82     This is a temporary class for handling GPG requests, and should be
83     replaced by a more general class used throughout the project.
84     """
85
86     GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
87     GNUPG_BINARY = "/usr/bin/gpg"  # this has to be changed based on OS
88
89     def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
90                  verbose=False, use_agent=False, keyring=None, options=None):
91         """
92         Initialize a GnuPG process wrapper.
93
94         :param gpgbinary: Name for GnuPG binary executable.
95         :type gpgbinary: C{str}
96         :param gpghome: Full pathname to directory containing the public and
97             private keyrings.
98         :type gpghome: C{str}
99         :param keyring: Name of alternative keyring file to use. If specified,
100             the default keyring is not used.
101         :param verbose: Should some verbose info be output?
102         :type verbose: bool
103         :param use_agent: Should pass `--use-agent` to GPG binary?
104         :type use_agent: bool
105         :param keyring: Path for the keyring to use.
106         :type keyring: str
107         @options: A list of additional options to pass to the GPG binary.
108         :type options: list
109
110         @raise: RuntimeError with explanation message if there is a problem
111             invoking gpg.
112         """
113         gnupg.GPG.__init__(self, gnupghome=gnupghome, gpgbinary=gpgbinary,
114                            verbose=verbose, use_agent=use_agent,
115                            keyring=keyring, options=options)
116         self.result_map['list-packets'] = ListPackets
117
118     def find_key_by_email(self, email, secret=False):
119         """
120         Find user's key based on their email.
121
122         :param email: Email address of key being searched for.
123         :type email: str
124         :param secret: Should we search for a secret key?
125         :type secret: bool
126
127         :return: The fingerprint of the found key.
128         :rtype: str
129         """
130         for key in self.list_keys(secret=secret):
131             for uid in key['uids']:
132                 if re.search(email, uid):
133                     return key
134         raise LookupError("GnuPG public key for email %s not found!" % email)
135
136     def find_key_by_subkey(self, subkey, secret=False):
137         """
138         Find user's key based on a subkey fingerprint.
139
140         :param email: Subkey fingerprint of the key being searched for.
141         :type email: str
142         :param secret: Should we search for a secret key?
143         :type secret: bool
144
145         :return: The fingerprint of the found key.
146         :rtype: str
147         """
148         for key in self.list_keys(secret=secret):
149             for sub in key['subkeys']:
150                 if sub[0] == subkey:
151                     return key
152         raise LookupError(
153             "GnuPG public key for subkey %s not found!" % subkey)
154
155     def find_key_by_keyid(self, keyid, secret=False):
156         """
157         Find user's key based on the key ID.
158
159         :param email: The key ID of the key being searched for.
160         :type email: str
161         :param secret: Should we search for a secret key?
162         :type secret: bool
163
164         :return: The fingerprint of the found key.
165         :rtype: str
166         """
167         for key in self.list_keys(secret=secret):
168             if keyid == key['keyid']:
169                 return key
170         raise LookupError(
171             "GnuPG public key for keyid %s not found!" % keyid)
172
173     def find_key_by_fingerprint(self, fingerprint, secret=False):
174         """
175         Find user's key based on the key fingerprint.
176
177         :param email: The fingerprint of the key being searched for.
178         :type email: str
179         :param secret: Should we search for a secret key?
180         :type secret: bool
181
182         :return: The fingerprint of the found key.
183         :rtype: str
184         """
185         for key in self.list_keys(secret=secret):
186             if fingerprint == key['fingerprint']:
187                 return key
188         raise LookupError(
189             "GnuPG public key for fingerprint %s not found!" % fingerprint)
190
191     def encrypt(self, data, recipient, sign=None, always_trust=True,
192                 passphrase=None, symmetric=False):
193         """
194         Encrypt data using GPG.
195
196         :param data: The data to be encrypted.
197         :type data: str
198         :param recipient: The address of the public key to be used.
199         :type recipient: str
200         :param sign: Should the encrypted content be signed?
201         :type sign: bool
202         :param always_trust: Skip key validation and assume that used keys
203             are always fully trusted?
204         :type always_trust: bool
205         :param passphrase: The passphrase to be used if symmetric encryption
206             is desired.
207         :type passphrase: str
208         :param symmetric: Should we encrypt to a password?
209         :type symmetric: bool
210
211         :return: An object with encrypted result in the `data` field.
212         :rtype: gnupg.Crypt
213         """
214         # TODO: devise a way so we don't need to "always trust".
215         return gnupg.GPG.encrypt(self, data, recipient, sign=sign,
216                                  always_trust=always_trust,
217                                  passphrase=passphrase,
218                                  symmetric=symmetric,
219                                  cipher_algo='AES256')
220
221     def decrypt(self, data, always_trust=True, passphrase=None):
222         """
223         Decrypt data using GPG.
224
225         :param data: The data to be decrypted.
226         :type data: str
227         :param always_trust: Skip key validation and assume that used keys
228             are always fully trusted?
229         :type always_trust: bool
230         :param passphrase: The passphrase to be used if symmetric encryption
231             is desired.
232         :type passphrase: str
233
234         :return: An object with decrypted result in the `data` field.
235         :rtype: gnupg.Crypt
236         """
237         # TODO: devise a way so we don't need to "always trust".
238         return gnupg.GPG.decrypt(self, data, always_trust=always_trust,
239                                  passphrase=passphrase)
240
241     def send_keys(self, keyserver, *keyids):
242         """
243         Send keys to a keyserver
244
245         :param keyserver: The keyserver to send the keys to.
246         :type keyserver: str
247         :param keyids: The key ids to send.
248         :type keyids: list
249
250         :return: A list of keys sent to server.
251         :rtype: gnupg.ListKeys
252         """
253         # TODO: write tests for this.
254         # TODO: write a SendKeys class to handle status for this.
255         result = self.result_map['list'](self)
256         gnupg.logger.debug('send_keys: %r', keyids)
257         data = gnupg._make_binary_stream("", self.encoding)
258         args = ['--keyserver', keyserver, '--send-keys']
259         args.extend(keyids)
260         self._handle_io(args, data, result, binary=True)
261         gnupg.logger.debug('send_keys result: %r', result.__dict__)
262         data.close()
263         return result
264
265     def encrypt_file(self, file, recipients, sign=None,
266                      always_trust=False, passphrase=None,
267                      armor=True, output=None, symmetric=False,
268                      cipher_algo=None):
269         """
270         Encrypt the message read from the file-like object 'file'.
271
272         :param file: The file to be encrypted.
273         :type data: file
274         :param recipient: The address of the public key to be used.
275         :type recipient: str
276         :param sign: Should the encrypted content be signed?
277         :type sign: bool
278         :param always_trust: Skip key validation and assume that used keys
279             are always fully trusted?
280         :type always_trust: bool
281         :param passphrase: The passphrase to be used if symmetric encryption
282             is desired.
283         :type passphrase: str
284         :param armor: Create ASCII armored output?
285         :type armor: bool
286         :param output: Path of file to write results in.
287         :type output: str
288         :param symmetric: Should we encrypt to a password?
289         :type symmetric: bool
290         :param cipher_algo: Algorithm to use.
291         :type cipher_algo: str
292
293         :return: An object with encrypted result in the `data` field.
294         :rtype: gnupg.Crypt
295         """
296         args = ['--encrypt']
297         if symmetric:
298             args = ['--symmetric']
299             if cipher_algo:
300                 args.append('--cipher-algo %s' % cipher_algo)
301         else:
302             args = ['--encrypt']
303             if not _is_sequence(recipients):
304                 recipients = (recipients,)
305             for recipient in recipients:
306                 args.append('--recipient "%s"' % recipient)
307         if armor:  # create ascii-armored output - set to False for binary
308             args.append('--armor')
309         if output:  # write the output to a file with the specified name
310             if os.path.exists(output):
311                 os.remove(output)  # to avoid overwrite confirmation message
312             args.append('--output "%s"' % output)
313         if sign:
314             args.append('--sign --default-key "%s"' % sign)
315         if always_trust:
316             args.append("--always-trust")
317         result = self.result_map['crypt'](self)
318         self._handle_io(args, file, result, passphrase=passphrase, binary=True)
319         logger.debug('encrypt result: %r', result.data)
320         return result
321
322     def list_packets(self, data):
323         """
324         List the sequence of packets.
325
326         :param data: The data to extract packets from.
327         :type data: str
328
329         :return: An object with packet info.
330         :rtype ListPackets
331         """
332         args = ["--list-packets"]
333         result = self.result_map['list-packets'](self)
334         self._handle_io(
335             args,
336             _make_binary_stream(data, self.encoding),
337             result,
338         )
339         return result
340
341     def encrypted_to(self, data):
342         """
343         Return the key to which data is encrypted to.
344
345         :param data: The data to be examined.
346         :type data: str
347
348         :return: The fingerprint of the key to which data is encrypted to.
349         :rtype: str
350         """
351         # TODO: make this support multiple keys.
352         result = self.list_packets(data)
353         if not result.key:
354             raise LookupError(
355                 "Content is not encrypted to a GnuPG key!")
356         try:
357             return self.find_key_by_keyid(result.key)
358         except:
359             return self.find_key_by_subkey(result.key)
360
361     def is_encrypted_sym(self, data):
362         """
363         Say whether some chunk of data is encrypted to a symmetric key.
364
365         :param data: The data to be examined.
366         :type data: str
367
368         :return: Whether data is encrypted to a symmetric key.
369         :rtype: bool
370         """
371         result = self.list_packets(data)
372         return bool(result.need_passphrase_sym)
373
374     def is_encrypted_asym(self, data):
375         """
376         Say whether some chunk of data is encrypted to a private key.
377
378         :param data: The data to be examined.
379         :type data: str
380
381         :return: Whether data is encrypted to a private key.
382         :rtype: bool
383         """
384         result = self.list_packets(data)
385         return bool(result.key)
386
387     def is_encrypted(self, data):
388         """
389         Say whether some chunk of data is encrypted to a key.
390
391         :param data: The data to be examined.
392         :type data: str
393
394         :return: Whether data is encrypted to a key.
395         :rtype: bool
396         """
397         return self.is_encrypted_asym(data) or self.is_encrypted_sym(data)