summaryrefslogtreecommitdiff
path: root/src/leap
diff options
context:
space:
mode:
authorRuben Pollan <meskio@sindominio.net>2015-12-15 13:29:44 +0100
committerRuben Pollan <meskio@sindominio.net>2016-02-25 11:35:18 -0600
commit64eaf2ee426623072bc2d9b1faf77ab831cb3be1 (patch)
treef25638fc9c2bc2ddcbb2262492c18beb990d8696 /src/leap
parentb8e03b8fc18d2f1a02210b622c3befe562f46ab7 (diff)
[feat] move validation, usage and audited date to the active document
- Resolves: #7485
Diffstat (limited to 'src/leap')
-rw-r--r--src/leap/keymanager/__init__.py1
-rw-r--r--src/leap/keymanager/keys.py132
-rw-r--r--src/leap/keymanager/openpgp.py268
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py15
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py54
-rw-r--r--src/leap/keymanager/tests/test_validation.py164
6 files changed, 430 insertions, 204 deletions
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 9aa71395..8b3487f9 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -439,6 +439,7 @@ class KeyManager(object):
:return: A Deferred which fires with a list of all keys in local db.
:rtype: Deferred
"""
+ # TODO: should it be based on activedocs?
def build_keys(docs):
return map(
lambda doc: build_key_from_dict(
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index 91559c2f..0de6a823 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -28,6 +28,7 @@ except ImportError:
import logging
import re
import time
+import traceback
from abc import ABCMeta, abstractmethod
@@ -110,39 +111,48 @@ def is_address(address):
return bool(re.match('[\w.-]+@[\w.-]+', address))
-def build_key_from_dict(kClass, kdict):
+def build_key_from_dict(kClass, key, active=None):
"""
Build an C{kClass} key based on info in C{kdict}.
- :param kdict: Dictionary with key data.
- :type kdict: dict
+ :param key: Dictionary with key data.
+ :type key: dict
+ :param active: Dictionary with active data.
+ :type active: dict
:return: An instance of the key.
:rtype: C{kClass}
"""
- try:
- validation = ValidationLevels.get(kdict[KEY_VALIDATION_KEY])
- except ValueError:
- logger.error("Not valid validation level (%s) for key %s",
- (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
- validation = ValidationLevels.Weak_Chain
-
- expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
- last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
- refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
+ validation = ValidationLevels.Weak_Chain
+ last_audited_at = None
+ encr_used = False
+ sign_used = False
+
+ if active:
+ try:
+ validation = ValidationLevels.get(active[KEY_VALIDATION_KEY])
+ except ValueError:
+ logger.error("Not valid validation level (%s) for key %s",
+ (active[KEY_VALIDATION_KEY], active[KEY_ID_KEY]))
+ last_audited_at = _to_datetime(active[KEY_LAST_AUDITED_AT_KEY])
+ encr_used = active[KEY_ENCR_USED_KEY]
+ sign_used = active[KEY_SIGN_USED_KEY]
+
+ expiry_date = _to_datetime(key[KEY_EXPIRY_DATE_KEY])
+ refreshed_at = _to_datetime(key[KEY_REFRESHED_AT_KEY])
return kClass(
- kdict[KEY_ADDRESS_KEY],
- key_id=kdict[KEY_ID_KEY],
- fingerprint=kdict[KEY_FINGERPRINT_KEY],
- key_data=kdict[KEY_DATA_KEY],
- private=kdict[KEY_PRIVATE_KEY],
- length=kdict[KEY_LENGTH_KEY],
+ key[KEY_ADDRESS_KEY],
+ key_id=key[KEY_ID_KEY],
+ fingerprint=key[KEY_FINGERPRINT_KEY],
+ key_data=key[KEY_DATA_KEY],
+ private=key[KEY_PRIVATE_KEY],
+ length=key[KEY_LENGTH_KEY],
expiry_date=expiry_date,
last_audited_at=last_audited_at,
refreshed_at=refreshed_at,
validation=validation,
- encr_used=kdict[KEY_ENCR_USED_KEY],
- sign_used=kdict[KEY_SIGN_USED_KEY],
+ encr_used=encr_used,
+ sign_used=sign_used,
)
@@ -178,6 +188,7 @@ class EncryptionKey(object):
key_data="", private=False, length=0, expiry_date=None,
validation=ValidationLevels.Weak_Chain, last_audited_at=None,
refreshed_at=None, encr_used=False, sign_used=False):
+ # TODO: it should know its own active address
self.address = address
self.key_id = key_id
self.fingerprint = fingerprint
@@ -185,6 +196,7 @@ class EncryptionKey(object):
self.private = private
self.length = length
self.expiry_date = expiry_date
+
self.validation = validation
self.last_audited_at = last_audited_at
self.refreshed_at = refreshed_at
@@ -199,7 +211,6 @@ class EncryptionKey(object):
:rtype: str
"""
expiry_date = _to_unix_time(self.expiry_date)
- last_audited_at = _to_unix_time(self.last_audited_at)
refreshed_at = _to_unix_time(self.refreshed_at)
return json.dumps({
@@ -211,11 +222,7 @@ class EncryptionKey(object):
KEY_PRIVATE_KEY: self.private,
KEY_LENGTH_KEY: self.length,
KEY_EXPIRY_DATE_KEY: expiry_date,
- KEY_LAST_AUDITED_AT_KEY: last_audited_at,
KEY_REFRESHED_AT_KEY: refreshed_at,
- KEY_VALIDATION_KEY: str(self.validation),
- KEY_ENCR_USED_KEY: self.encr_used,
- KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
})
@@ -223,16 +230,20 @@ class EncryptionKey(object):
"""
Return a JSON string describing this key.
- :param address: Address for wich the key is active
- :type address: str
:return: The JSON string describing this key.
:rtype: str
"""
+ last_audited_at = _to_unix_time(self.last_audited_at)
+
return json.dumps({
KEY_ADDRESS_KEY: address,
KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE,
KEY_ID_KEY: self.key_id,
KEY_PRIVATE_KEY: self.private,
+ KEY_VALIDATION_KEY: str(self.validation),
+ KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ KEY_ENCR_USED_KEY: self.encr_used,
+ KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_ACTIVE_TAG],
})
@@ -464,3 +475,68 @@ class EncryptionScheme(object):
:rtype: bool
"""
pass
+
+ def _repair_key_docs(self, doclist):
+ """
+ If there is more than one key for a key id try to self-repair it
+
+ :return: a Deferred that will be fired with the valid key doc once all
+ the deletions are completed
+ :rtype: Deferred
+ """
+ def log_key_doc(doc):
+ logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
+ doc.content[KEY_FINGERPRINT_KEY]))
+
+ def cmp_key(d1, d2):
+ return cmp(d1.content[KEY_REFRESHED_AT_KEY],
+ d2.content[KEY_REFRESHED_AT_KEY])
+
+ return self._repair_docs(doclist, cmp_key, log_key_doc)
+
+ def _repair_active_docs(self, doclist):
+ """
+ If there is more than one active doc for an address try to self-repair
+ it
+
+ :return: a Deferred that will be fired with the valid active doc once
+ all the deletions are completed
+ :rtype: Deferred
+ """
+ def log_active_doc(doc):
+ logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
+ doc.content[KEY_ID_KEY]))
+
+ def cmp_active(d1, d2):
+ res = cmp(d1.content[KEY_LAST_AUDITED_AT_KEY],
+ d2.content[KEY_LAST_AUDITED_AT_KEY])
+ if res != 0:
+ return res
+
+ used1 = (d1.content[KEY_SIGN_USED_KEY] +
+ d1.content[KEY_ENCR_USED_KEY])
+ used2 = (d2.content[KEY_SIGN_USED_KEY] +
+ d2.content[KEY_ENCR_USED_KEY])
+ return cmp(used1, used2)
+
+ return self._repair_docs(doclist, cmp_active, log_active_doc)
+
+ def _repair_docs(self, doclist, cmp_func, log_func):
+ logger.error("BUG ---------------------------------------------------")
+ logger.error("There is more than one doc of type %s:"
+ % (doclist[0].content[KEY_TYPE_KEY],))
+
+ doclist.sort(cmp=cmp_func, reverse=True)
+ log_func(doclist[0])
+ deferreds = []
+ for doc in doclist[1:]:
+ log_func(doc)
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+
+ logger.error("")
+ logger.error(traceback.extract_stack())
+ logger.error("BUG (please report above info) ------------------------")
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ d.addCallback(lambda _: doclist[0])
+ return d
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 90640439..3c8ac1e1 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -22,7 +22,6 @@ import os
import re
import shutil
import tempfile
-import traceback
import io
@@ -44,8 +43,6 @@ from leap.keymanager.keys import (
TYPE_ADDRESS_PRIVATE_INDEX,
KEY_ADDRESS_KEY,
KEY_ID_KEY,
- KEY_FINGERPRINT_KEY,
- KEY_REFRESHED_AT_KEY,
KEYMANAGER_ACTIVE_TYPE,
)
@@ -223,6 +220,41 @@ class OpenPGPKey(EncryptionKey):
return []
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(_parse_address(uid))
+
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at = datetime.now()
+
class OpenPGPScheme(EncryptionScheme):
"""
@@ -332,15 +364,16 @@ class OpenPGPScheme(EncryptionScheme):
"""
address = _parse_address(address)
- def build_key(doc):
- if doc is None:
+ def build_key((keydoc, activedoc)):
+ if keydoc is None:
raise errors.KeyNotFound(address)
leap_assert(
- address in doc.content[KEY_ADDRESS_KEY],
+ address in keydoc.content[KEY_ADDRESS_KEY],
'Wrong address in key %s. Expected %s, found %s.'
- % (doc.content[KEY_ID_KEY], address,
- doc.content[KEY_ADDRESS_KEY]))
- key = build_key_from_dict(OpenPGPKey, doc.content)
+ % (keydoc.content[KEY_ID_KEY], address,
+ keydoc.content[KEY_ADDRESS_KEY]))
+ key = build_key_from_dict(OpenPGPKey, keydoc.content,
+ activedoc.content)
key._gpgbinary = self._gpgbinary
return key
@@ -426,100 +459,44 @@ class OpenPGPScheme(EncryptionScheme):
:return: A Deferred which fires when the key is in the storage.
:rtype: Deferred
"""
- d = self._put_key_doc(key)
- d.addCallback(lambda _: self._put_active_doc(key, address))
- return d
+ def merge_and_put((keydoc, activedoc)):
+ if not keydoc:
+ return put_new_key(activedoc)
- def _put_key_doc(self, key):
- """
- Put key document in soledad
+ active_content = None
+ if activedoc:
+ active_content = activedoc.content
+ oldkey = build_key_from_dict(OpenPGPKey, keydoc.content,
+ active_content)
- :type key: OpenPGPKey
- :rtype: Deferred
- """
- def check_and_put(docs, key):
- deferred_repair = defer.succeed(None)
- if len(docs) == 0:
- return self._soledad.create_doc_from_json(key.get_json())
- elif len(docs) > 1:
- deferred_repair = self._repair_key_docs(docs, key.key_id)
-
- doc = docs[0]
- oldkey = build_key_from_dict(OpenPGPKey, doc.content)
- if key.fingerprint != oldkey.fingerprint:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (key.fingerprint, oldkey.fingerprint))
- return defer.fail(
- errors.KeyFingerprintMismatch(key.fingerprint))
-
- # in case of an update of the key merge them with gnupg
- with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
- gpg.import_keys(oldkey.key_data)
- gpg.import_keys(key.key_data)
- gpgkey = gpg.list_keys(secret=key.private).pop()
- mergedkey = self._build_key_from_gpg(
- gpgkey,
- gpg.export_keys(gpgkey['fingerprint'],
- secret=key.private))
- mergedkey.validation = max(
- [key.validation, oldkey.validation])
- mergedkey.last_audited_at = oldkey.last_audited_at
- mergedkey.refreshed_at = key.refreshed_at
- mergedkey.encr_used = key.encr_used or oldkey.encr_used
- mergedkey.sign_used = key.sign_used or oldkey.sign_used
- doc.set_json(mergedkey.get_json())
- deferred_put = self._soledad.put_doc(doc)
-
- d = defer.gatherResults([deferred_put, deferred_repair])
- d.addCallback(lambda res: res[0])
- return d
+ key.merge(oldkey)
+ keydoc.set_json(key.get_json())
+ deferred_key = self._soledad.put_doc(keydoc)
- d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
- self.KEY_TYPE,
- key.key_id,
- '1' if key.private else '0')
- d.addCallback(check_and_put, key)
- return d
+ active_json = key.get_active_json(address)
+ if activedoc:
+ activedoc.set_json(active_json)
+ deferred_active = self._soledad.put_doc(activedoc)
+ else:
+ deferred_active = self._soledad.create_doc_from_json(
+ active_json)
- def _put_active_doc(self, key, address):
- """
- Put active key document in soledad
+ return defer.gatherResults([deferred_key, deferred_active])
- :type key: OpenPGPKey
- :type addresses: str
- :rtype: Deferred
- """
- def check_and_put(docs):
- if len(docs) == 1:
- doc = docs.pop()
- doc.set_json(key.get_active_json(address))
- d = self._soledad.put_doc(doc)
- else:
- if len(docs) > 1:
- logger.error("There is more than one active key document "
- "for the address %s" % (address,))
- deferreds = []
- for doc in docs:
- delete = self._soledad.delete_doc(doc)
- deferreds.append(delete)
- d = defer.gatherResults(deferreds, consumeErrors=True)
- else:
- d = defer.succeed(None)
-
- d.addCallback(
- lambda _: self._soledad.create_doc_from_json(
- key.get_active_json(address)))
- return d
+ def put_new_key(activedoc):
+ deferreds = []
+ if activedoc:
+ d = self._soledad.delete_doc(activedoc)
+ deferreds.append(d)
+ for json in [key.get_json(), key.get_active_json(address)]:
+ d = self._soledad.create_doc_from_json(json)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
- d = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if key.private else '0')
- d.addCallback(check_and_put)
+ dk = self._get_key_doc_from_keyid(key.key_id, key.private)
+ da = self._get_active_doc_from_address(address, key.private)
+ d = defer.gatherResults([dk, da])
+ d.addCallback(merge_and_put)
return d
def _get_key_doc(self, address, private=False):
@@ -533,44 +510,26 @@ class OpenPGPScheme(EncryptionScheme):
:param private: Whether to look for a private key.
:type private: bool
- :return: A Deferred which fires with the SoledadDocument with the key
- or None if it does not exist.
+ :return: A Deferred which fires with a touple of two SoledadDocument
+ (keydoc, activedoc) or None if it does not exist.
:rtype: Deferred
"""
def get_key_from_active_doc(activedoc):
- if len(activedoc) is 0:
- return None
- leap_assert(
- len(activedoc) is 1,
- 'Found more than one key for address %s!' % (address,))
-
- key_id = activedoc[0].content[KEY_ID_KEY]
- d = self._soledad.get_from_index(
- TYPE_ID_PRIVATE_INDEX,
- self.KEY_TYPE,
- key_id,
- '1' if private else '0')
- d.addCallback(get_doc, key_id, activedoc)
+ if not activedoc:
+ return (None, None)
+ key_id = activedoc.content[KEY_ID_KEY]
+ d = self._get_key_doc_from_keyid(key_id, private)
+ d.addCallback(delete_active_if_no_key, activedoc)
return d
- def get_doc(doclist, key_id, activedoc):
- if len(doclist) == 0:
- logger.warning('There is no key for id %s! Self-repairing it.'
- % (key_id))
+ def delete_active_if_no_key(keydoc, activedoc):
+ if not keydoc:
d = self._soledad.delete_doc(activedoc)
- d.addCallback(lambda _: None)
- return d
- elif len(doclist) > 1:
- d = self._repair_key_docs(doclist, key_id)
- d.addCallback(lambda _: doclist[0])
+ d.addCallback(lambda _: (None, None))
return d
- return doclist[0]
+ return (keydoc, activedoc)
- d = self._soledad.get_from_index(
- TYPE_ADDRESS_PRIVATE_INDEX,
- self.ACTIVE_TYPE,
- address,
- '1' if private else '0')
+ d = self._get_active_doc_from_address(address, private)
d.addCallback(get_key_from_active_doc)
return d
@@ -647,36 +606,6 @@ class OpenPGPScheme(EncryptionScheme):
d.addCallback(delete_key)
return d
- def _repair_key_docs(self, doclist, key_id):
- """
- If there is more than one key for a key id try to self-repair it
-
- :return: a Deferred that will be fired once all the deletions are
- completed
- :rtype: Deferred
- """
- logger.error("BUG ---------------------------------------------------")
- logger.error("There is more than one key with the same key_id %s:"
- % (key_id,))
-
- def log_key_doc(doc):
- logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
- doc.content[KEY_FINGERPRINT_KEY]))
-
- doclist.sort(key=lambda doc: doc.content[KEY_REFRESHED_AT_KEY],
- reverse=True)
- log_key_doc(doclist[0])
- deferreds = []
- for doc in doclist[1:]:
- log_key_doc(doc)
- d = self._soledad.delete_doc(doc)
- deferreds.append(d)
-
- logger.error("")
- logger.error(traceback.extract_stack())
- logger.error("BUG (please report above info) ------------------------")
- return defer.gatherResults(deferreds, consumeErrors=True)
-
#
# Data encryption, decryption, signing and verifying
#
@@ -885,6 +814,31 @@ class OpenPGPScheme(EncryptionScheme):
kfprint = gpgpubkey['fingerprint']
return valid and rfprint == kfprint
+ def _get_active_doc_from_address(self, address, private):
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_active_docs)
+ return d
+
+ def _get_key_doc_from_keyid(self, key_id, private):
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key_id,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_key_docs)
+ return d
+
+ def _repair_and_get_doc(self, doclist, repair_func):
+ if len(doclist) is 0:
+ return None
+ elif len(doclist) > 1:
+ return repair_func(doclist)
+ return doclist[0]
+
def process_ascii_key(key_data, gpgbinary, secret=False):
with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 0f08326e..e4e0d8b4 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -83,13 +83,18 @@ class KeyManagerUtilTestCase(unittest.TestCase):
'private': False,
'length': 4096,
'expiry_date': 0,
- 'last_audited_at': 0,
'refreshed_at': 1311239602,
+ }
+ adict = {
+ 'address': ADDRESS,
+ 'key_id': KEY_FINGERPRINT[-16:],
+ 'private': False,
+ 'last_audited_at': 0,
'validation': str(ValidationLevels.Weak_Chain),
'encr_used': False,
'sign_used': True,
}
- key = build_key_from_dict(OpenPGPKey, kdict)
+ key = build_key_from_dict(OpenPGPKey, kdict, adict)
self.assertEqual(
kdict['address'], key.address,
'Wrong data in key.')
@@ -118,13 +123,13 @@ class KeyManagerUtilTestCase(unittest.TestCase):
datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
- ValidationLevels.get(kdict['validation']), key.validation,
+ ValidationLevels.get(adict['validation']), key.validation,
'Wrong data in key.')
self.assertEqual(
- kdict['encr_used'], key.encr_used,
+ adict['encr_used'], key.encr_used,
'Wrong data in key.')
self.assertEqual(
- kdict['sign_used'], key.sign_used,
+ adict['sign_used'], key.sign_used,
'Wrong data in key.')
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 96b40a0e..66415916 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -29,7 +29,10 @@ from leap.keymanager import (
KeyNotFound,
openpgp,
)
-from leap.keymanager.keys import TYPE_ID_PRIVATE_INDEX
+from leap.keymanager.keys import (
+ TYPE_ID_PRIVATE_INDEX,
+ TYPE_ADDRESS_PRIVATE_INDEX,
+)
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
@@ -308,6 +311,7 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
try:
yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
KeyNotFound)
+ # it should have deleted the index
self.assertEqual(self._soledad.delete_doc.call_count, 1)
finally:
self._soledad.get_from_index = get_from_index
@@ -349,6 +353,54 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
self._soledad.get_from_index = get_from_index
self._soledad.delete_doc = delete_doc
+ @inlineCallbacks
+ def test_self_repair_five_active_docs(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ADDRESS_PRIVATE_INDEX and
+ args[2] == ADDRESS):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ last_audited_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ last_audited_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ last_audited_at=datetime(2007, 1, 1),
+ encr_used=True, sign_used=True)
+ k4 = OpenPGPKey(ADDRESS, key_id="4",
+ last_audited_at=datetime(2007, 1, 1),
+ sign_used=True)
+ k5 = OpenPGPKey(ADDRESS, key_id="5",
+ last_audited_at=datetime(2007, 1, 1),
+ encr_used=True)
+ deferreds = []
+ for k in [k1, k2, k3, k4, k5]:
+ d = self._soledad.create_doc_from_json(
+ k.get_active_json(ADDRESS))
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ elif args[0] == TYPE_ID_PRIVATE_INDEX:
+ key_id = args[2]
+ self.assertEqual(key_id, "3")
+ k = OpenPGPKey(ADDRESS, key_id="3")
+ return succeed(
+ [self._soledad.create_doc_from_json(k.get_json())])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield pgp.get_key(ADDRESS, private=False)
+ self.assertEqual(self._soledad.delete_doc.call_count, 4)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)
return self.assertFailure(d, KeyNotFound)
diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py
index bcf41c43..44d6e394 100644
--- a/src/leap/keymanager/tests/test_validation.py
+++ b/src/leap/keymanager/tests/test_validation.py
@@ -108,14 +108,14 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
TEXT = "some text"
km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
+ signature = yield km.sign(TEXT, ADDRESS, OpenPGPKey)
+ yield self.delete_all_keys(km)
+
yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
-
- km2 = self._key_manager()
- yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
- signature = yield km2.sign(TEXT, ADDRESS, OpenPGPKey)
-
yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature)
+
d = km.put_raw_key(
UNRELATED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
@@ -126,17 +126,17 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
TEXT = "some text"
km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2)
+ encrypted = yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey,
+ sign=ADDRESS)
+ yield self.delete_all_keys(km)
+
yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
-
- km2 = self._key_manager()
- yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
- yield km2.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2)
- encrypted = yield km2.encrypt(TEXT, ADDRESS_2, OpenPGPKey,
- sign=ADDRESS)
-
yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS)
+
d = km.put_raw_key(
UNRELATED_KEY, OpenPGPKey, ADDRESS,
validation=ValidationLevels.Provider_Endorsement)
@@ -150,6 +150,33 @@ class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT)
+ @inlineCallbacks
+ def test_two_uuids(self):
+ TEXT = "some text"
+
+ km = self._key_manager()
+ yield km.put_raw_key(UUIDS_PRIVATE, OpenPGPKey, ADDRESS_2)
+ signature = yield km.sign(TEXT, ADDRESS_2, OpenPGPKey)
+ yield self.delete_all_keys(km)
+
+ yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS_2)
+ yield km.put_raw_key(UUIDS_KEY, OpenPGPKey, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS_2, OpenPGPKey)
+ yield km.verify(TEXT, ADDRESS_2, OpenPGPKey, detached_sig=signature)
+
+ d = km.put_raw_key(
+ PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2,
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT)
+
+ yield km.put_raw_key(
+ PUBLIC_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
+
# Key material for testing
@@ -364,6 +391,117 @@ X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn
-----END PGP PUBLIC KEY BLOCK-----
"""
+# key 0x1DDBAEB928D982F7: public key two uuids
+# uid anotheruser <anotheruser@leap.se>
+# uid Leap Test Key <leap@leap.se>
+UUIDS_FINGERPRINT = "21690ED054C1B2F3ACE963D38FCC7DEFB4EE5A9B"
+UUIDS_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mQENBFZwjz0BCADHpVg7js8PK9gQXx3Z+Jtj6gswYZpeXRdIUfZBSebWNGKwXxC9
+ZZDjnQc3l6Kezh7ra/hB6xulDbj3vXi66V279QSOuFAKMIudlJehb86bUiVk9Ppy
+kdrn44P40ZdVmw2m61WrKnvBelKW7pIF/EB/dY1uUonSfR56f/BxL5a5jGi2uI2y
+2hTPnZEksoKQsjsp1FckPFwVGzRKVzYl3lsorL5oiHd450G2B3VRw8AZ8Eqq6bzf
+jNrrA3TOMmEIYdADPqqnBj+xbnOCGBsvx+iAhGRxUckVJqW92SXlNNds8kEyoE0t
+9B6eqe0MrrlcK3SLe03j85T9f1o3An53rV/3ABEBAAG0HExlYXAgVGVzdCBLZXkg
+PGxlYXBAbGVhcC5zZT6JAT0EEwEIACcFAlZwjz0CGwMFCRLMAwAFCwkIBwMFFQoJ
+CAsFFgMCAQACHgECF4AACgkQj8x977TuWpu4ZggAgk6rVtOCqYwM720Bs3k+wsWu
+IVPUqnlPbSWph/PKBKWYE/5HoIGdvfN9jJxwpCM5x/ivPe1zeJ0qa9SnO66kolHU
+7qC8HRWC67R4znO4Zrs2I+SgwRHAPPbqMVPsNs5rS0D6DCcr+LXtJF+LLAsIwDfw
+mXEsKbX5H5aBmmDnfq0pGx05E3tKs5l09VVESvVZYOCM9b4FtdLVvgbKAD+KYDW1
+5A/PzOvyYjZu2FGtPKmNmqHD3KW8cmrcI/7mRR08FnNGbbpgXPZ2GPKgkUllY9N7
+E9lg4eaYH2fIWun293kbqp8ueELZvoU1jUQrP5B+eqBWTvIucqdQqJaiWn9pELQh
+YW5vdGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iQE9BBMBCAAnBQJWcI9a
+AhsDBQkSzAMABQsJCAcDBRUKCQgLBRYDAgEAAh4BAheAAAoJEI/Mfe+07lqblRMH
+/17vK2WOd0F7EegA5ELOrM+QJPKpLK4e6WdROUpS1hvRQcAOqadCCpYPSTTG/HnO
+d9/Q9Q/G3xHHlk6nl1qHRkVlp0iVWyBZFn1s8lgGz/FFUEXXRj7I5RGmKSNgDlqA
+un2OrwB2m+DH6pMjizu/RUfIJM2bSgViuvjCQoaLYLcFiULJlWHb/2WgpvesFyAc
+0oc9AkXuaCEo50XQlrt8Bh++6rfbAMAS7ZrHluFPIY+y4eVl+MU/QkoGYAVgiLLV
+5tnvbDZWNs8ixw4ubqKXYj5mK55sapokhOqObEfY6D3p7YpdQO/IhBneCw9gKOxa
+wYAPhCOrJC8JmE69I1Nk8Bu5AQ0EVnCPPQEIANUivsrR2jwb8C9wPONn0HS3YYCI
+/IVlLdw/Ia23ogBF1Uh8ORNg1G/t0/6S7IKDZ2gGgWw25u9TjWRRWsxO9tjOPi2d
+YuhwlQRnq5Or+LzIEKRs9GnJMLFT0kR9Nrhw4UyaN6tWkR9p1Py7ux8RLmDEMAs3
+fBfVMLMmQRerJ5SyCUiq/lm9aYTLCC+vkjE01C+2oI0gcWGfLDjiJbaD4AazzibP
+fBe41BIk7WaCJyEcBqfrgW/Seof43FhSKRGgc5nx3HH1DMz9AtYfKnVS5DgoBGpO
+hxgzIJN3/hFHPicRlYoHxLUE48GcFfQFEJ78RXeBuieXAkDxNczHnLkEPx8AEQEA
+AYkBJQQYAQgADwUCVnCPPQIbDAUJEswDAAAKCRCPzH3vtO5amyRIB/9IsWUWrvbH
+njw2ZCiIV++lHgIntAcuQIbZIjyMXoM+imHsPrsDOUT65yi9Xp1pUyZEKtGkZvP4
+p7HRzJL+RWiWEN7sgQfNqqev8BF2/xmxkmiSuXHJ0PSaC5DmLfFDyvvSU6H5VPud
+NszKIHtyoS6ph6TH8GXzFmNiHaTOZKdmVxlyLj1/DN+d63M+ARZIe7gB6jmP/gg4
+o52icfTcqLkkppYn8g1A9bdJ3k8qqExNPTf2llDscuD9VzpebFbPqfcYqR71GfG7
+Kmg7qGnZtNac1ERvknI/fmmCQydGk5pOh0KUTgeLG2qB07cqCUBbOXaweNWbiM9E
+vtQLNMD9Gn7D
+=MCXv
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+UUIDS_PRIVATE = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1
+
+lQOYBFZwjz0BCADHpVg7js8PK9gQXx3Z+Jtj6gswYZpeXRdIUfZBSebWNGKwXxC9
+ZZDjnQc3l6Kezh7ra/hB6xulDbj3vXi66V279QSOuFAKMIudlJehb86bUiVk9Ppy
+kdrn44P40ZdVmw2m61WrKnvBelKW7pIF/EB/dY1uUonSfR56f/BxL5a5jGi2uI2y
+2hTPnZEksoKQsjsp1FckPFwVGzRKVzYl3lsorL5oiHd450G2B3VRw8AZ8Eqq6bzf
+jNrrA3TOMmEIYdADPqqnBj+xbnOCGBsvx+iAhGRxUckVJqW92SXlNNds8kEyoE0t
+9B6eqe0MrrlcK3SLe03j85T9f1o3An53rV/3ABEBAAEAB/9Lzeg2lP7hz8/2R2da
+QB8gTNl6wVSPx+DzQMuz9o+DfdiLB02f3FSrWBBJd3XzvmfXE+Prg423mgJFbtfM
+gJdqqpnUZv9dHxmj96urTHyyVPqF3s7JecAYlDaj31EK3BjO7ERW/YaH7B432NXx
+F9qVitjsrsJN/dv4v2NYVq1wPcdDB05ge9WP+KRec7xvdTudH4Kov0iMZ+1Nksfn
+lrowGuMYBGWDlTNoBoEYxDD2lqVaiOjyjx5Ss8btS59IlXxApOFZTezekl7hUI2B
+1fDQ1GELL6BKVKxApGSD5XAgVlqkek4RhoHmg4gKSymfbFV5oRp1v1kC0JIGvnB1
+W5/BBADKzagL4JRnhWGubLec917B3se/3y1aHrEmO3T0IzxnUMD5yvg81uJWi5Co
+M05Nu/Ny/Fw1VgoF8MjiGnumB2KKytylu8LKLarDxPpLxabOBCQLHrLQOMsmesjR
+Cg3iYl/EeM/ooAufaN4IObcu6Pa8//rwNE7Fz1ZsIyJefN4fnwQA/AOpqA2BvHoH
+VRYh4NVuMLhF1YdKqcd/T/dtSqszcadkmG4+vAL76r3jYxScRoNGQaIkpBnzP0ry
+Adb0NDuBgSe/Cn44kqT7JJxTMfJNrw2rBMMUYZHdQrck2pf5R4fZ74yJyvCKg5pQ
+QAl1gTSi6PJvPwpc7m7Kr4kHBVDlgKkEAJKkVrtq/v2+jSA+/osa4YC5brsDQ08X
+pvZf0MBkc5/GDfusHyE8HGFnVY5ycrS85TBCrhc7suFu59pF4wEeXsqxqNf02gRe
+B+btPwR7yki73iyXs4cbuszHMD03UnbvItFAybD5CC+oR9kG5noI0TzJNUNX9Vkq
+xATf819dhwtgTha0HExlYXAgVGVzdCBLZXkgPGxlYXBAbGVhcC5zZT6JAT0EEwEI
+ACcFAlZwjz0CGwMFCRLMAwAFCwkIBwMFFQoJCAsFFgMCAQACHgECF4AACgkQj8x9
+77TuWpu4ZggAgk6rVtOCqYwM720Bs3k+wsWuIVPUqnlPbSWph/PKBKWYE/5HoIGd
+vfN9jJxwpCM5x/ivPe1zeJ0qa9SnO66kolHU7qC8HRWC67R4znO4Zrs2I+SgwRHA
+PPbqMVPsNs5rS0D6DCcr+LXtJF+LLAsIwDfwmXEsKbX5H5aBmmDnfq0pGx05E3tK
+s5l09VVESvVZYOCM9b4FtdLVvgbKAD+KYDW15A/PzOvyYjZu2FGtPKmNmqHD3KW8
+cmrcI/7mRR08FnNGbbpgXPZ2GPKgkUllY9N7E9lg4eaYH2fIWun293kbqp8ueELZ
+voU1jUQrP5B+eqBWTvIucqdQqJaiWn9pELQhYW5vdGhlcnVzZXIgPGFub3RoZXJ1
+c2VyQGxlYXAuc2U+iQE9BBMBCAAnBQJWcI9aAhsDBQkSzAMABQsJCAcDBRUKCQgL
+BRYDAgEAAh4BAheAAAoJEI/Mfe+07lqblRMH/17vK2WOd0F7EegA5ELOrM+QJPKp
+LK4e6WdROUpS1hvRQcAOqadCCpYPSTTG/HnOd9/Q9Q/G3xHHlk6nl1qHRkVlp0iV
+WyBZFn1s8lgGz/FFUEXXRj7I5RGmKSNgDlqAun2OrwB2m+DH6pMjizu/RUfIJM2b
+SgViuvjCQoaLYLcFiULJlWHb/2WgpvesFyAc0oc9AkXuaCEo50XQlrt8Bh++6rfb
+AMAS7ZrHluFPIY+y4eVl+MU/QkoGYAVgiLLV5tnvbDZWNs8ixw4ubqKXYj5mK55s
+apokhOqObEfY6D3p7YpdQO/IhBneCw9gKOxawYAPhCOrJC8JmE69I1Nk8BudA5gE
+VnCPPQEIANUivsrR2jwb8C9wPONn0HS3YYCI/IVlLdw/Ia23ogBF1Uh8ORNg1G/t
+0/6S7IKDZ2gGgWw25u9TjWRRWsxO9tjOPi2dYuhwlQRnq5Or+LzIEKRs9GnJMLFT
+0kR9Nrhw4UyaN6tWkR9p1Py7ux8RLmDEMAs3fBfVMLMmQRerJ5SyCUiq/lm9aYTL
+CC+vkjE01C+2oI0gcWGfLDjiJbaD4AazzibPfBe41BIk7WaCJyEcBqfrgW/Seof4
+3FhSKRGgc5nx3HH1DMz9AtYfKnVS5DgoBGpOhxgzIJN3/hFHPicRlYoHxLUE48Gc
+FfQFEJ78RXeBuieXAkDxNczHnLkEPx8AEQEAAQAH+wRSCn0RCPP7+v/zLgDMG3Eq
+QHs7C6dmmCnlS7j6Rnnr8HliL0QBy/yi3Q/Fia7RnBiDPT9k04SZdH3KmmUW2rEl
+aSRCkv00PwkSUuuQ6l9lTNUQclnsnqSRlusVgLT3cNG9NJCwFgwFeLBQ2+ey0PZc
+M78edlEDXNPc3CfvK8O7WK74YiNJqIQCs7aDJSv0s7O/asRQyMCsl/UYtMV6W03d
+eauS3bM41ll7GVfHMgkChFUQHb+19JHzSq4yeqQ/vS30ASugFxB3Omomp95sRL/w
+60y51faLyTKD4AN3FhDfeIEfh1ggN2UT70qzC3+F8TvxQQHEBhNQKlhWVbWTp+0E
+ANkcyokvn+09OIO/YDxF3aB37gA3J37d6NXos6pdvqUPOVSHvmF/hiM0FO2To6vu
+ex/WcDQafPm4eiW6oNllwtZhWU2tr34bZD4PIuktSX2Ax2v5QtZ4d1CVdDEwbYn/
+fmR+nif1fTKTljZragaI9Rt6NWhfh7UGt62iIKh0lbhLBAD7T5wHY8V1yqlnyByG
+K7nt+IHnND2I7Hk58yxKjv2KUNYxWZeOAQTQmfQXjJ+BOmw6oHMmDmGvdjSxIE+9
+j9nezEONxzVVjEDTKBeEnUeDY1QGDyDyW1/AhLJ52yWGTNrmKcGV4KmaYnhDzq7Z
+aqJVRcFMF9TAfhrEGGhRdD83/QQA6xAqjWiu6tbaDurVuce64mA1R3T7HJ81gEaX
+I+eJNDJb7PK3dhFETgyc3mcepWFNJkoXqx2ADhG8jLqK4o/N/QlV5PQgeHmhz09V
+Z7MNhivGxDKZoxX6Bouh+qs5OkatcGFhTz//+FHSfusV2emxNiwd4QIsizxaltqh
+W1ke0bA7eYkBJQQYAQgADwUCVnCPPQIbDAUJEswDAAAKCRCPzH3vtO5amyRIB/9I
+sWUWrvbHnjw2ZCiIV++lHgIntAcuQIbZIjyMXoM+imHsPrsDOUT65yi9Xp1pUyZE
+KtGkZvP4p7HRzJL+RWiWEN7sgQfNqqev8BF2/xmxkmiSuXHJ0PSaC5DmLfFDyvvS
+U6H5VPudNszKIHtyoS6ph6TH8GXzFmNiHaTOZKdmVxlyLj1/DN+d63M+ARZIe7gB
+6jmP/gg4o52icfTcqLkkppYn8g1A9bdJ3k8qqExNPTf2llDscuD9VzpebFbPqfcY
+qR71GfG7Kmg7qGnZtNac1ERvknI/fmmCQydGk5pOh0KUTgeLG2qB07cqCUBbOXaw
+eNWbiM9EvtQLNMD9Gn7D
+=/3u/
+-----END PGP PRIVATE KEY BLOCK-----
+"""
+
if __name__ == "__main__":
- import unittest
unittest.main()