Use type instead of tags to get docs in openpgp
[keymanager.git] / src / leap / keymanager / keys.py
1 # -*- coding: utf-8 -*-
2 # keys.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19 """
20 Abstact key type and encryption scheme representations.
21 """
22
23
24 try:
25     import simplejson as json
26 except ImportError:
27     import json  # noqa
28 import logging
29 import re
30 import time
31
32
33 from abc import ABCMeta, abstractmethod
34 from datetime import datetime
35 from leap.common.check import leap_assert
36
37 from leap.keymanager.validation import ValidationLevel, toValidationLevel
38
39 logger = logging.getLogger(__name__)
40
41
42 #
43 # Dictionary keys used for storing cryptographic keys.
44 #
45
46 KEY_ADDRESS_KEY = 'address'
47 KEY_TYPE_KEY = 'type'
48 KEY_ID_KEY = 'key_id'
49 KEY_FINGERPRINT_KEY = 'fingerprint'
50 KEY_DATA_KEY = 'key_data'
51 KEY_PRIVATE_KEY = 'private'
52 KEY_LENGTH_KEY = 'length'
53 KEY_EXPIRY_DATE_KEY = 'expiry_date'
54 KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
55 KEY_REFRESHED_AT_KEY = 'refreshed_at'
56 KEY_VALIDATION_KEY = 'validation'
57 KEY_ENCR_USED_KEY = 'encr_used'
58 KEY_SIGN_USED_KEY = 'sign_used'
59 KEY_TAGS_KEY = 'tags'
60
61
62 #
63 # Key storage constants
64 #
65
66 KEYMANAGER_KEY_TAG = 'keymanager-key'
67
68
69 #
70 # key indexing constants.
71 #
72
73 TAGS_PRIVATE_INDEX = 'by-tags-private'
74 TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
75 INDEXES = {
76     TAGS_PRIVATE_INDEX: [
77         KEY_TAGS_KEY,
78         'bool(%s)' % KEY_PRIVATE_KEY,
79     ],
80     TYPE_ADDRESS_PRIVATE_INDEX: [
81         KEY_TYPE_KEY,
82         KEY_ADDRESS_KEY,
83         'bool(%s)' % KEY_PRIVATE_KEY,
84     ]
85 }
86
87
88 #
89 # Key handling utilities
90 #
91
92 def is_address(address):
93     """
94     Return whether the given C{address} is in the form user@provider.
95
96     :param address: The address to be tested.
97     :type address: str
98     :return: Whether C{address} is in the form user@provider.
99     :rtype: bool
100     """
101     return bool(re.match('[\w.-]+@[\w.-]+', address))
102
103
104 def build_key_from_dict(kClass, address, kdict):
105     """
106     Build an C{kClass} key bound to C{address} based on info in C{kdict}.
107
108     :param address: The address bound to the key.
109     :type address: str
110     :param kdict: Dictionary with key data.
111     :type kdict: dict
112     :return: An instance of the key.
113     :rtype: C{kClass}
114     """
115     leap_assert(
116         address in kdict[KEY_ADDRESS_KEY],
117         'Wrong address in key data.')
118     try:
119         validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
120     except ValueError:
121         logger.error("Not valid validation level (%s) for key %s",
122                      (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
123         validation = ValidationLevel.Weak_Chain
124
125     expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
126     last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
127     refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
128
129     return kClass(
130         [address],
131         key_id=kdict[KEY_ID_KEY],
132         fingerprint=kdict[KEY_FINGERPRINT_KEY],
133         key_data=kdict[KEY_DATA_KEY],
134         private=kdict[KEY_PRIVATE_KEY],
135         length=kdict[KEY_LENGTH_KEY],
136         expiry_date=expiry_date,
137         last_audited_at=last_audited_at,
138         refreshed_at=refreshed_at,
139         validation=validation,
140         encr_used=kdict[KEY_ENCR_USED_KEY],
141         sign_used=kdict[KEY_SIGN_USED_KEY],
142     )
143
144
145 def _to_datetime(unix_time):
146     if unix_time != 0:
147         return datetime.fromtimestamp(unix_time)
148     else:
149         return None
150
151
152 def _to_unix_time(date):
153     if date is not None:
154         return int(time.mktime(date.timetuple()))
155     else:
156         return 0
157
158
159 #
160 # Abstraction for encryption keys
161 #
162
163 class EncryptionKey(object):
164     """
165     Abstract class for encryption keys.
166
167     A key is "validated" if the nicknym agent has bound the user address to a
168     public key.
169     """
170
171     __metaclass__ = ABCMeta
172
173     def __init__(self, address, key_id="", fingerprint="",
174                  key_data="", private=False, length=0, expiry_date=None,
175                  validation=ValidationLevel.Weak_Chain, last_audited_at=None,
176                  refreshed_at=None, encr_used=False, sign_used=False):
177         self.address = address
178         self.key_id = key_id
179         self.fingerprint = fingerprint
180         self.key_data = key_data
181         self.private = private
182         self.length = length
183         self.expiry_date = expiry_date
184         self.validation = validation
185         self.last_audited_at = last_audited_at
186         self.refreshed_at = refreshed_at
187         self.encr_used = encr_used
188         self.sign_used = sign_used
189
190     def get_json(self):
191         """
192         Return a JSON string describing this key.
193
194         :return: The JSON string describing this key.
195         :rtype: str
196         """
197         expiry_date = _to_unix_time(self.expiry_date)
198         last_audited_at = _to_unix_time(self.last_audited_at)
199         refreshed_at = _to_unix_time(self.refreshed_at)
200
201         return json.dumps({
202             KEY_ADDRESS_KEY: self.address,
203             KEY_TYPE_KEY: self.__class__.__name__,
204             KEY_ID_KEY: self.key_id,
205             KEY_FINGERPRINT_KEY: self.fingerprint,
206             KEY_DATA_KEY: self.key_data,
207             KEY_PRIVATE_KEY: self.private,
208             KEY_LENGTH_KEY: self.length,
209             KEY_EXPIRY_DATE_KEY: expiry_date,
210             KEY_LAST_AUDITED_AT_KEY: last_audited_at,
211             KEY_REFRESHED_AT_KEY: refreshed_at,
212             KEY_VALIDATION_KEY: str(self.validation),
213             KEY_ENCR_USED_KEY: self.encr_used,
214             KEY_SIGN_USED_KEY: self.sign_used,
215             KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
216         })
217
218     def __repr__(self):
219         """
220         Representation of this class
221         """
222         return u"<%s 0x%s (%s - %s)>" % (
223             self.__class__.__name__,
224             self.key_id,
225             self.address,
226             "priv" if self.private else "publ")
227
228
229 #
230 # Encryption schemes
231 #
232
233 class EncryptionScheme(object):
234     """
235     Abstract class for Encryption Schemes.
236
237     A wrapper for a certain encryption schemes should know how to get and put
238     keys in local storage using Soledad, how to generate new keys and how to
239     find out about possibly encrypted content.
240     """
241
242     __metaclass__ = ABCMeta
243
244     def __init__(self, soledad):
245         """
246         Initialize this Encryption Scheme.
247
248         :param soledad: A Soledad instance for local storage of keys.
249         :type soledad: leap.soledad.Soledad
250         """
251         self._soledad = soledad
252         self._init_indexes()
253
254     def _init_indexes(self):
255         """
256         Initialize the database indexes.
257         """
258         leap_assert(self._soledad is not None,
259                     "Cannot init indexes with null soledad")
260         # Ask the database for currently existing indexes.
261         db_indexes = dict(self._soledad.list_indexes())
262         # Loop through the indexes we expect to find.
263         for name, expression in INDEXES.items():
264             if name not in db_indexes:
265                 # The index does not yet exist.
266                 self._soledad.create_index(name, *expression)
267                 continue
268             if expression == db_indexes[name]:
269                 # The index exists and is up to date.
270                 continue
271             # The index exists but the definition is not what expected, so we
272             # delete it and add the proper index expression.
273             self._soledad.delete_index(name)
274             self._soledad.create_index(name, *expression)
275
276     @abstractmethod
277     def get_key(self, address, private=False):
278         """
279         Get key from local storage.
280
281         :param address: The address bound to the key.
282         :type address: str
283         :param private: Look for a private key instead of a public one?
284         :type private: bool
285
286         :return: The key bound to C{address}.
287         :rtype: EncryptionKey
288         @raise KeyNotFound: If the key was not found on local storage.
289         """
290         pass
291
292     @abstractmethod
293     def put_key(self, key):
294         """
295         Put a key in local storage.
296
297         :param key: The key to be stored.
298         :type key: EncryptionKey
299         """
300         pass
301
302     @abstractmethod
303     def gen_key(self, address):
304         """
305         Generate a new key.
306
307         :param address: The address bound to the key.
308         :type address: str
309
310         :return: The key bound to C{address}.
311         :rtype: EncryptionKey
312         """
313         pass
314
315     @abstractmethod
316     def delete_key(self, key):
317         """
318         Remove C{key} from storage.
319
320         :param key: The key to be removed.
321         :type key: EncryptionKey
322         """
323         pass
324
325     @abstractmethod
326     def encrypt(self, data, pubkey, passphrase=None, sign=None):
327         """
328         Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
329
330         :param data: The data to be encrypted.
331         :type data: str
332         :param pubkey: The key used to encrypt.
333         :type pubkey: EncryptionKey
334         :param sign: The key used for signing.
335         :type sign: EncryptionKey
336
337         :return: The encrypted data.
338         :rtype: str
339         """
340         pass
341
342     @abstractmethod
343     def decrypt(self, data, privkey, passphrase=None, verify=None):
344         """
345         Decrypt C{data} using private @{privkey} and verify with C{verify} key.
346
347         :param data: The data to be decrypted.
348         :type data: str
349         :param privkey: The key used to decrypt.
350         :type privkey: OpenPGPKey
351         :param verify: The key used to verify a signature.
352         :type verify: OpenPGPKey
353
354         :return: The decrypted data.
355         :rtype: str
356
357         @raise InvalidSignature: Raised if unable to verify the signature with
358             C{verify} key.
359         """
360         pass
361
362     @abstractmethod
363     def sign(self, data, privkey):
364         """
365         Sign C{data} with C{privkey}.
366
367         :param data: The data to be signed.
368         :type data: str
369
370         :param privkey: The private key to be used to sign.
371         :type privkey: EncryptionKey
372
373         :return: The signed data.
374         :rtype: str
375         """
376         pass
377
378     @abstractmethod
379     def verify(self, data, pubkey, detached_sig=None):
380         """
381         Verify signed C{data} with C{pubkey}, eventually using
382         C{detached_sig}.
383
384         :param data: The data to be verified.
385         :type data: str
386         :param pubkey: The public key to be used on verification.
387         :type pubkey: EncryptionKey
388         :param detached_sig: A detached signature. If given, C{data} is
389                              verified against this sdetached signature.
390         :type detached_sig: str
391
392         :return: The signed data.
393         :rtype: str
394         """
395         pass