summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-08-28 11:21:05 -0400
committerKali Kaneko <kali@leap.se>2015-08-28 11:21:05 -0400
commitdc26f67236fc540f17529df35128f49dcd1b42a4 (patch)
tree680eaedd6ea279330f0458165a5ecf5f0a25ad94
parent73e662f8741b5857b3881eb0ce7c08147a06a2bc (diff)
parent437a11a492ea2fc39050fbf4b0349f531558599e (diff)
Merge tag '0.4.2' into debian/experimental
Tag leap.keymanager version 0.4.2 Conflicts: pkg/requirements.pip src/leap/keymanager/_version.py
-rw-r--r--AUTHORS6
-rw-r--r--CHANGELOG50
-rw-r--r--README.rst7
-rw-r--r--docs/leap-commit-template7
-rw-r--r--docs/leap-commit-template.README47
-rwxr-xr-xpkg/generate_wheels.sh13
-rwxr-xr-xpkg/pip_install_requirements.sh84
-rw-r--r--pkg/requirements-latest.pip8
-rw-r--r--pkg/requirements-leap.pip1
-rw-r--r--pkg/requirements-testing.pip11
-rw-r--r--pkg/requirements.pip7
-rwxr-xr-xpkg/tools/get_authors.sh2
-rw-r--r--pkg/utils.py29
-rw-r--r--setup.cfg10
-rw-r--r--setup.py27
-rw-r--r--src/leap/keymanager/__init__.py677
-rw-r--r--src/leap/keymanager/_version.py14
-rw-r--r--src/leap/keymanager/errors.py12
-rw-r--r--src/leap/keymanager/keys.py227
-rw-r--r--src/leap/keymanager/openpgp.py527
-rw-r--r--src/leap/keymanager/tests/__init__.py310
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py810
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py252
-rw-r--r--src/leap/keymanager/tests/test_validation.py345
-rw-r--r--src/leap/keymanager/validation.py124
25 files changed, 2579 insertions, 1028 deletions
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..5e64248
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,6 @@
+Tomás Touceda <chiiph@leap.se>
+Ruben Pollan <meskio@sindominio.net>
+Ivan Alejandro <ivanalejandro0@gmail.com>
+drebs <drebs@leap.se>
+Kali Kaneko <kali@leap.se>
+Parménides GV <parmegv@sdf.org>
diff --git a/CHANGELOG b/CHANGELOG
index 6b24e5b..7e416a6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,4 +1,32 @@
-0.3.8 Apr 4:
+0.4.2 Aug 26, 2015:
+ o Style changes.
+ o Tests updates.
+ o Packaging improvements.
+
+0.4.1 Jul 10, 2015:
+ o Remove the dependency on enum34. Closes: #7188.
+
+0.4.0 Jun 8, 2015:
+ o Adapt to new events api on leap.common. Related to #5359.
+ o Add 'fetch_key' method to fetch keys from a URI. Closes #5932.
+ o Clean up API.
+ o Fix call to python-gnupg's verify_file() method. Closes #6022.
+ o KeyManager.put_key now accepts also ascii keys.
+ o Multi uid support. Closes #6212.
+ o Port keymanager to the new soledad async API. Closes #6368.
+ o Return always KeyNotFound failure if fetch keys fails on an unknown error.
+ o Upgrade keys if not successfully used and strict high validation level.
+ Closes #6211.
+ o Use addresses instead of keys for encrypt, decrypt, sign & verify.
+ Closes #6346.
+ o Expose info about the signing key. Closes #6366.
+ o Fetched keys from other domain than its provider are set as 'Weak Chain'
+ validation level. Closes #6815.
+ o Keep old key after upgrade. Closes #6262.
+ o New soledad doc struct for encryption-keys. Closes #6299.
+ o Upgrade key when signed by old key. Closes #6240.
+
+0.3.8 Apr 4, 2014:
o Properly raise KeyNotFound exception when looking for keys on
nickserver. Fixes #5415.
o Do not decode decrypted data, return as str.
@@ -6,16 +34,14 @@
o Memoize call to get_key. Closes #4784.
o Update auth to interact with webapp v2. Fixes #5120.
--- 2014 --
-
-0.3.7 Dec 6:
+0.3.7 Dec 6, 2013:
o Fix error return values on openpgp backend.
o Remove address check when sending email and rely in the email
client to verify that is correct. Closes #4491.
o Support sending encrypted mails to addresses using the '+' sign.
o Improve exception names and handling.
-0.3.6 Nov 15:
+0.3.6 Nov 15, 2013:
o Default encoding to 'utf-8' in case of system encodings not
set. Closes #4427.
o Add verification of detached signatures. Closes #4375.
@@ -23,37 +49,37 @@
o Expose openpgp methods in keymanager (parse_ascii_keys, put_key,
delete_key).
-0.3.5 Nov 1:
+0.3.5 Nov 1, 2013:
o Return unicode decrypted text to avoid encoding issues. Related to
#4000.
-0.3.4 Oct 18:
+0.3.4 Oct 18, 2013:
o Add option to choose cipher and digest algorithms when signing and
encrypting. Closes #4030.
-0.3.3 Oct 4:
+0.3.3 Oct 4, 2013:
o Add a sanity check for the correct version of gnupg.
o Update code to use gnupg 1.2.2 python module. Closes #2342.
-0.3.2 Sep 6:
+0.3.2 Sep 6, 2013:
o Do not raise exception when a GET request doesn't return 2XX
code. Nickserver uses error codes for more verbosity in the
result.
o Accept unicode ascii keys along with str.
-0.3.1 Aug 23:
+0.3.1 Aug 23, 2013:
o Signal different key related events, like key generation, key
upload.
o Update to new soledad package scheme (common, client and
server). Closes #3487.
o Packaging improvements: add versioneer and parse_requirements.
-0.3.0 Aug 9:
+0.3.0 Aug 9, 2013:
o If a nickserver request fails in any way, notify and continue.
o Options parameter in gnupg.GPG isn't supported by all versions, so
removing it for the time being.
o Add support for bundled gpg. Closes #3397.
o Refactor API to include encrypt/decrypt/sign/verify in KeyManager.
-0.2.0 Jul 12:
+0.2.0 Jul 12, 2013:
o Move keymanager to its own package
diff --git a/README.rst b/README.rst
index 8aedfb4..acf2335 100644
--- a/README.rst
+++ b/README.rst
@@ -6,3 +6,10 @@ LEAP's Key Manager
The Key Manager is a Nicknym agent for the LEAP project:
https://leap.se/pt/docs/design/nicknym
+
+running tests
+-------------
+
+Use trial to run the test suite::
+
+ trial leap.keymanager
diff --git a/docs/leap-commit-template b/docs/leap-commit-template
new file mode 100644
index 0000000..8a5c7cd
--- /dev/null
+++ b/docs/leap-commit-template
@@ -0,0 +1,7 @@
+[bug|feat|docs|style|refactor|test|pkg|i18n] ...
+...
+
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
diff --git a/docs/leap-commit-template.README b/docs/leap-commit-template.README
new file mode 100644
index 0000000..ce8809e
--- /dev/null
+++ b/docs/leap-commit-template.README
@@ -0,0 +1,47 @@
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+HOW TO USE THIS TEMPLATE:
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Run `git config commit.template docs/leap-commit-template` or
+edit the .git/config for this project and add
+`template = docs/leap-commit-template`
+under the [commit] block
+
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+COMMIT TEMPLATE FORMAT EXPLAINED
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+[type] <subject>
+
+<body>
+<footer>
+
+Type should be one of the following:
+- bug (bug fix)
+- feat (new feature)
+- docs (changes to documentation)
+- style (formatting, pep8 violations, etc; no code change)
+- refactor (refactoring production code)
+- test (adding missing tests, refactoring tests; no production code change)
+- pkg (packaging related changes; no production code change)
+- i18n translation related changes
+
+Subject should use imperative tone and say what you did.
+For example, use 'change', NOT 'changed' or 'changes'.
+
+The body should go into detail about changes made.
+
+The footer should contain any issue references or actions.
+You can use one or several of the following:
+
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
+
+The Documentation field should be included in every new feature commit, and it
+should link to an issue in the bug tracker where the new feature is analyzed
+and documented.
+
+For a full example of how to write a good commit message, check out
+https://github.com/sparkbox/how_to/tree/master/style/git
diff --git a/pkg/generate_wheels.sh b/pkg/generate_wheels.sh
new file mode 100755
index 0000000..a13e2c7
--- /dev/null
+++ b/pkg/generate_wheels.sh
@@ -0,0 +1,13 @@
+#!/bin/sh
+# Generate wheels for dependencies
+# Use at your own risk.
+
+if [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+fi
+
+pip wheel --wheel-dir $WHEELHOUSE pip
+pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements.pip
+if [ -f pkg/requirements-testing.pip ]; then
+ pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip
+fi
diff --git a/pkg/pip_install_requirements.sh b/pkg/pip_install_requirements.sh
new file mode 100755
index 0000000..8ca0956
--- /dev/null
+++ b/pkg/pip_install_requirements.sh
@@ -0,0 +1,84 @@
+#!/bin/bash
+# Update pip and install LEAP base/testing requirements.
+# For convenience, $insecure_packages are allowed with insecure flags enabled.
+# Use at your own risk.
+# See $usage for help
+
+insecure_packages=""
+leap_wheelhouse=https://lizard.leap.se/wheels
+
+show_help() {
+ usage="Usage: $0 [--testing] [--use-leap-wheels]\n --testing\t\tInstall dependencies from requirements-testing.pip\n
+\t\t\tOtherwise, it will install requirements.pip\n
+--use-leap-wheels\tUse wheels from leap.se"
+ echo -e $usage
+
+ exit 1
+}
+
+process_arguments() {
+ testing=false
+ while [ "$#" -gt 0 ]; do
+ # From http://stackoverflow.com/a/31443098
+ case "$1" in
+ --help) show_help;;
+ --testing) testing=true; shift 1;;
+ --use-leap-wheels) use_leap_wheels=true; shift 1;;
+
+ -h) show_help;;
+ -*) echo "unknown option: $1" >&2; exit 1;;
+ esac
+ done
+}
+
+return_wheelhouse() {
+ if $use_leap_wheels ; then
+ WHEELHOUSE=$leap_wheelhouse
+ elif [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+ fi
+
+ # Tested with bash and zsh
+ if [[ $WHEELHOUSE != http* && ! -d "$WHEELHOUSE" ]]; then
+ mkdir $WHEELHOUSE
+ fi
+
+ echo "$WHEELHOUSE"
+}
+
+return_install_options() {
+ wheelhouse=`return_wheelhouse`
+ install_options="-U --find-links=$wheelhouse"
+ if $use_leap_wheels ; then
+ install_options="$install_options --trusted-host lizard.leap.se"
+ fi
+
+ echo $install_options
+}
+
+return_insecure_flags() {
+ for insecure_package in $insecure_packages; do
+ flags="$flags --allow-external $insecure_package --allow-unverified $insecure_package"
+ done
+
+ echo $flags
+}
+
+return_packages() {
+ if $testing ; then
+ packages="-r pkg/requirements-testing.pip"
+ else
+ packages="-r pkg/requirements.pip"
+ fi
+
+ echo $packages
+}
+
+process_arguments $@
+install_options=`return_install_options`
+insecure_flags=`return_insecure_flags`
+packages=`return_packages`
+
+pip install -U wheel
+pip install $install_options pip
+pip install $install_options $insecure_flags $packages
diff --git a/pkg/requirements-latest.pip b/pkg/requirements-latest.pip
new file mode 100644
index 0000000..148d42b
--- /dev/null
+++ b/pkg/requirements-latest.pip
@@ -0,0 +1,8 @@
+--index-url https://pypi.python.org/simple/
+
+--allow-external u1db --allow-unverified u1db
+--allow-external dirspec --allow-unverified dirspec
+-e 'git+https://github.com/pixelated-project/leap_pycommon.git@develop#egg=leap.common'
+-e 'git+https://github.com/pixelated-project/soledad.git@develop#egg=leap.soledad.common&subdirectory=common/'
+-e 'git+https://github.com/pixelated-project/soledad.git@develop#egg=leap.soledad.client&subdirectory=client/'
+-e .
diff --git a/pkg/requirements-leap.pip b/pkg/requirements-leap.pip
new file mode 100644
index 0000000..b311859
--- /dev/null
+++ b/pkg/requirements-leap.pip
@@ -0,0 +1 @@
+leap.common>=0.4.0
diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip
index f41fdba..addda19 100644
--- a/pkg/requirements-testing.pip
+++ b/pkg/requirements-testing.pip
@@ -1,2 +1,11 @@
mock
-leap.soledad.client
+setuptools-trial
+pep8
+
+#-----------------------------------------------------------------------------
+#Although it's not a package dependency, tests also depend on having
+#soledad client installed. Commenting to avoid versioning problem, you should
+#know what you are testing against :)
+#-----------------------------------------------------------------------------
+
+#leap.soledad.client
diff --git a/pkg/requirements.pip b/pkg/requirements.pip
index a6092a9..c4cb09a 100644
--- a/pkg/requirements.pip
+++ b/pkg/requirements.pip
@@ -1,6 +1,5 @@
-leap.common>=0.3.7
-simplejson
-requests
# if we bump the gnupg version, bump also the sanity check
# in keymanager.__init__
-gnupg>=1.2.4.99
+gnupg>=1.4.0
+simplejson
+requests
diff --git a/pkg/tools/get_authors.sh b/pkg/tools/get_authors.sh
new file mode 100755
index 0000000..0169bb1
--- /dev/null
+++ b/pkg/tools/get_authors.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+git log --format='%aN <%aE>' | awk '{arr[$0]++} END{for (i in arr){print arr[i], i;}}' | sort -rn | cut -d' ' -f2-
diff --git a/pkg/utils.py b/pkg/utils.py
index deace14..9c9211b 100644
--- a/pkg/utils.py
+++ b/pkg/utils.py
@@ -14,20 +14,34 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
Utils to help in the setup process
"""
-
import os
import re
import sys
+def is_develop_mode():
+ """
+ Returns True if we're calling the setup script using the argument for
+ setuptools development mode.
+
+ This avoids messing up with dependency pinning and order, the
+ responsibility of installing the leap dependencies is left to the
+ developer.
+ """
+ args = sys.argv
+ devflags = "setup.py", "develop"
+ if (args[0], args[1]) == devflags:
+ return True
+ return False
+
+
def get_reqs_from_files(reqfiles):
"""
Returns the contents of the top requirement file listed as a
- string list with the lines
+ string list with the lines.
@param reqfiles: requirement files to parse
@type reqfiles: list of str
@@ -43,6 +57,9 @@ def parse_requirements(reqfiles=['requirements.txt',
"""
Parses the requirement files provided.
+ The passed reqfiles list is a list of possible locations to try, the
+ function will return the contents of the first path found.
+
Checks the value of LEAP_VENV_SKIP_PYSIDE to see if it should
return PySide as a dep or not. Don't set, or set to 0 if you want
to install it through pip.
@@ -58,9 +75,9 @@ def parse_requirements(reqfiles=['requirements.txt',
if re.match(r'\s*-e\s+', line):
pass
# do not try to do anything with externals on vcs
- #requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
- #line))
- # http://foo.bar/baz/foobar/zipball/master#egg=foobar
+ # requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
+ # line))
+ # http://foo.bar/baz/foobar/zipball/master#egg=foobar
elif re.match(r'\s*https?:', line):
requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
line))
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..51070c6
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,10 @@
+[aliases]
+test = trial
+
+[pep8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
+
+[flake8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
diff --git a/setup.py b/setup.py
index 778909d..26840c6 100644
--- a/setup.py
+++ b/setup.py
@@ -20,6 +20,10 @@ setup file for leap.keymanager
import re
from setuptools import setup
from setuptools import find_packages
+from setuptools import Command
+
+from pkg import utils
+
import versioneer
versioneer.versionfile_source = 'src/leap/keymanager/_version.py'
@@ -27,8 +31,6 @@ versioneer.versionfile_build = 'leap/keymanager/_version.py'
versioneer.tag_prefix = '' # tags are like 1.2.0
versioneer.parentdir_prefix = 'leap.keymanager-'
-from pkg import utils
-
trove_classifiers = [
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
@@ -59,9 +61,6 @@ if len(_version_short) > 0:
cmdclass = versioneer.get_cmdclass()
-from setuptools import Command
-
-
class freeze_debianver(Command):
"""
Freezes the version in a debian branch.
@@ -107,6 +106,22 @@ cmdclass["freeze_debianver"] = freeze_debianver
# XXX add ref to docs
+requirements = utils.parse_requirements()
+
+if utils.is_develop_mode():
+ print
+ print ("[WARNING] Skipping leap-specific dependencies "
+ "because development mode is detected.")
+ print ("[WARNING] You can install "
+ "the latest published versions with "
+ "'pip install -r pkg/requirements-leap.pip'")
+ print ("[WARNING] Or you can instead do 'python setup.py develop' "
+ "from the parent folder of each one of them.")
+ print
+else:
+ requirements += utils.parse_requirements(
+ reqfiles=["pkg/requirements-leap.pip"])
+
setup(
name='leap.keymanager',
version=VERSION,
@@ -129,7 +144,7 @@ setup(
packages=find_packages('src', exclude=['leap.keymanager.tests']),
package_dir={'': 'src'},
test_suite='leap.keymanager.tests',
- install_requires=utils.parse_requirements(),
+ install_requires=requirements,
tests_require=utils.parse_requirements(
reqfiles=['pkg/requirements-testing.pip']),
)
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 41f352e..999b53c 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -19,18 +19,26 @@ Key Manager is a Nicknym agent for LEAP client.
"""
# let's do a little sanity check to see if we're using the wrong gnupg
import sys
+from ._version import get_versions
try:
from gnupg.gnupg import GPGUtilities
assert(GPGUtilities) # pyflakes happy
from gnupg import __version__ as _gnupg_version
+ if '-' in _gnupg_version:
+ # avoid Parsing it as LegacyVersion, get just
+ # the release numbers:
+ _gnupg_version = _gnupg_version.split('-')[0]
from pkg_resources import parse_version
- assert(parse_version(_gnupg_version) >= parse_version('1.2.3'))
+ # We need to make sure that we're not colliding with the infamous
+ # python-gnupg
+ assert(parse_version(_gnupg_version) >= parse_version('1.4.0'))
except (ImportError, AssertionError):
print "*******"
print "Ooops! It looks like there is a conflict in the installed version "
print "of gnupg."
+ print "GNUPG_VERSION:", _gnupg_version
print
print "Disclaimer: Ideally, we would need to work a patch and propose the "
print "merge to upstream. But until then do: "
@@ -43,23 +51,33 @@ except (ImportError, AssertionError):
import logging
import requests
-from leap.common.check import leap_assert, leap_assert_type
-from leap.common.events import signal
-from leap.common.events import events_pb2 as proto
+from twisted.internet import defer
+from urlparse import urlparse
+
+from leap.common.check import leap_assert
+from leap.common.events import emit, catalog
from leap.common.decorators import memoized_method
-from leap.keymanager.errors import KeyNotFound
+from leap.keymanager.errors import (
+ KeyNotFound,
+ KeyAddressMismatch,
+ KeyNotValidUpgrade,
+ UnsupportedKeyTypeError,
+ InvalidSignature
+)
+from leap.keymanager.validation import ValidationLevels, can_upgrade
from leap.keymanager.keys import (
- EncryptionKey,
build_key_from_dict,
KEYMANAGER_KEY_TAG,
TAGS_PRIVATE_INDEX,
)
-from leap.keymanager.openpgp import (
- OpenPGPKey,
- OpenPGPScheme,
-)
+from leap.keymanager.openpgp import OpenPGPKey, OpenPGPScheme
+
+from ._version import get_versions
+
+__version__ = get_versions()['version']
+del get_versions
logger = logging.getLogger(__name__)
@@ -82,12 +100,12 @@ class KeyManager(object):
gpgbinary=None):
"""
Initialize a Key Manager for user's C{address} with provider's
- nickserver reachable in C{url}.
+ nickserver reachable in C{nickserver_uri}.
- :param address: The address of the user of this Key Manager.
+ :param address: The email address of the user of this Key Manager.
:type address: str
- :param url: The URL of the nickserver.
- :type url: str
+ :param nickserver_uri: The URI of the nickserver.
+ :type nickserver_uri: str
:param soledad: A Soledad instance for local storage of keys.
:type soledad: leap.soledad.Soledad
:param token: The token for interacting with the webapp API.
@@ -98,7 +116,7 @@ class KeyManager(object):
:type api_uri: str
:param api_version: The version of the webapp API.
:type api_version: str
- :param uid: The users' UID.
+ :param uid: The user's UID.
:type uid: str
:param gpgbinary: Name for GnuPG binary executable.
:type gpgbinary: C{str}
@@ -129,7 +147,7 @@ class KeyManager(object):
Return key class from string representation of key type.
"""
return filter(
- lambda klass: str(klass) == ktype,
+ lambda klass: klass.__name__ == ktype,
self._wrapper_map).pop()
def _get(self, uri, data=None):
@@ -189,33 +207,54 @@ class KeyManager(object):
res.raise_for_status()
return res
+ @memoized_method(invalidation=300)
def _fetch_keys_from_server(self, address):
"""
- Fetch keys bound to C{address} from nickserver and insert them in
+ Fetch keys bound to address from nickserver and insert them in
local database.
:param address: The address bound to the keys.
:type address: str
- :raise KeyNotFound: If the key was not found on nickserver.
+ :return: A Deferred which fires when the key is in the storage,
+ or which fails with KeyNotFound if the key was not found on
+ nickserver.
+ :rtype: Deferred
+
"""
# request keys from the nickserver
+ d = defer.succeed(None)
res = None
try:
res = self._get(self._nickserver_uri, {'address': address})
res.raise_for_status()
server_keys = res.json()
+
# insert keys in local database
if self.OPENPGP_KEY in server_keys:
- self._wrapper_map[OpenPGPKey].put_ascii_key(
- server_keys['openpgp'])
+ # nicknym server is authoritative for its own domain,
+ # for other domains the key might come from key servers.
+ validation_level = ValidationLevels.Weak_Chain
+ _, domain = _split_email(address)
+ if (domain == _get_domain(self._nickserver_uri)):
+ validation_level = ValidationLevels.Provider_Trust
+
+ d = self.put_raw_key(
+ server_keys['openpgp'],
+ OpenPGPKey,
+ address=address,
+ validation=validation_level)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
- raise KeyNotFound(address)
+ d = defer.fail(KeyNotFound(address))
+ else:
+ d = defer.fail(KeyNotFound(e.message))
logger.warning("HTTP error retrieving key: %r" % (e,))
logger.warning("%s" % (res.content,))
except Exception as e:
+ d = defer.fail(KeyNotFound(e.message))
logger.warning("Error retrieving key: %r" % (e,))
+ return d
#
# key management
@@ -223,48 +262,41 @@ class KeyManager(object):
def send_key(self, ktype):
"""
- Send user's key of type C{ktype} to provider.
+ Send user's key of type ktype to provider.
Public key bound to user's is sent to provider, which will sign it and
replace any prior keys for the same address in its database.
- If C{send_private} is True, then the private key is encrypted with
- C{password} and sent to server in the same request, together with a
- hash string of user's address and password. The encrypted private key
- will be saved in the server in a way it is publicly retrievable
- through the hash string.
-
:param ktype: The type of the key.
- :type ktype: KeyType
+ :type ktype: subclass of EncryptionKey
- :raise KeyNotFound: If the key was not found in local database.
- """
- leap_assert(
- ktype is OpenPGPKey,
- 'For now we only know how to send OpenPGP public keys.')
- # prepare the public key bound to address
- pubkey = self.get_key(
- self._address, ktype, private=False, fetch_remote=False)
- data = {
- self.PUBKEY_KEY: pubkey.key_data
- }
- uri = "%s/%s/users/%s.json" % (
- self._api_uri,
- self._api_version,
- self._uid)
- self._put(uri, data)
- signal(proto.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
+ :return: A Deferred which fires when the key is sent, or which fails
+ with KeyNotFound if the key was not found in local database.
+ :rtype: Deferred
- @memoized_method
- def get_key_from_cache(self, *args, **kwargs):
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- Public interface to `get_key`, that is memoized.
- """
- return self.get_key(*args, **kwargs)
+ self._assert_supported_key_type(ktype)
+
+ def send(pubkey):
+ data = {
+ self.PUBKEY_KEY: pubkey.key_data
+ }
+ uri = "%s/%s/users/%s.json" % (
+ self._api_uri,
+ self._api_version,
+ self._uid)
+ self._put(uri, data)
+ emit(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
+
+ d = self.get_key(
+ self._address, ktype, private=False, fetch_remote=False)
+ d.addCallback(send)
+ return d
def get_key(self, address, ktype, private=False, fetch_remote=True):
"""
- Return a key of type C{ktype} bound to C{address}.
+ Return a key of type ktype bound to address.
First, search for the key in local storage. If it is not available,
then try to fetch from nickserver.
@@ -272,86 +304,106 @@ class KeyManager(object):
:param address: The address bound to the key.
:type address: str
:param ktype: The type of the key.
- :type ktype: KeyType
+ :type ktype: subclass of EncryptionKey
:param private: Look for a private key instead of a public one?
:type private: bool
+ :param fetch_remote: If key not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
+
+ :return: A Deferred which fires with an EncryptionKey of type ktype
+ bound to address, or which fails with KeyNotFound if no key
+ was found neither locally or in keyserver.
+ :rtype: Deferred
- :return: A key of type C{ktype} bound to C{address}.
- :rtype: EncryptionKey
- :raise KeyNotFound: If the key was not found both locally and in
- keyserver.
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
+ self._assert_supported_key_type(ktype)
logger.debug("getting key for %s" % (address,))
leap_assert(
ktype in self._wrapper_map,
'Unkown key type: %s.' % str(ktype))
- try:
- signal(proto.KEYMANAGER_LOOKING_FOR_KEY, address)
- # return key if it exists in local database
- key = self._wrapper_map[ktype].get_key(address, private=private)
- signal(proto.KEYMANAGER_KEY_FOUND, address)
+ emit(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ def key_found(key):
+ emit(catalog.KEYMANAGER_KEY_FOUND, address)
return key
- except KeyNotFound:
- signal(proto.KEYMANAGER_KEY_NOT_FOUND, address)
+
+ def key_not_found(failure):
+ if not failure.check(KeyNotFound):
+ return failure
+
+ emit(catalog.KEYMANAGER_KEY_NOT_FOUND, address)
# we will only try to fetch a key from nickserver if fetch_remote
# is True and the key is not private.
if fetch_remote is False or private is True:
- raise
+ return failure
- signal(proto.KEYMANAGER_LOOKING_FOR_KEY, address)
- self._fetch_keys_from_server(address) # might raise KeyNotFound
- key = self._wrapper_map[ktype].get_key(address, private=False)
- signal(proto.KEYMANAGER_KEY_FOUND, address)
+ emit(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ d = self._fetch_keys_from_server(address)
+ d.addCallback(
+ lambda _:
+ self._wrapper_map[ktype].get_key(address, private=False))
+ d.addCallback(key_found)
+ return d
- return key
+ # return key if it exists in local database
+ d = self._wrapper_map[ktype].get_key(address, private=private)
+ d.addCallbacks(key_found, key_not_found)
+ return d
- def get_all_keys_in_local_db(self, private=False):
+ def get_all_keys(self, private=False):
"""
Return all keys stored in local database.
- :return: A list with all keys in local db.
- :rtype: list
- """
- return map(
- lambda doc: build_key_from_dict(
- self._key_class_from_type(doc.content['type']),
- doc.content['address'],
- doc.content),
- self._soledad.get_from_index(
- TAGS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
- '1' if private else '0'))
+ :param private: Include private keys
+ :type private: bool
- def refresh_keys(self):
- """
- Fetch keys from nickserver and update them locally.
- """
- addresses = set(map(
- lambda doc: doc.address,
- self.get_all_keys_in_local_db(private=False)))
- for address in addresses:
- # do not attempt to refresh our own key
- if address == self._address:
- continue
- self._fetch_keys_from_server(address)
+ :return: A Deferred which fires with a list of all keys in local db.
+ :rtype: Deferred
+ """
+ def build_keys(docs):
+ return map(
+ lambda doc: build_key_from_dict(
+ self._key_class_from_type(doc.content['type']),
+ doc.content),
+ docs)
+
+ # XXX: there is no check that the soledad indexes are ready, as it
+ # happens with EncryptionScheme.
+ # The usecases right now are not problematic. This could be solve
+ # adding a keytype to this funciont and moving the soledad request
+ # to the EncryptionScheme.
+ d = self._soledad.get_from_index(
+ TAGS_PRIVATE_INDEX,
+ KEYMANAGER_KEY_TAG,
+ '1' if private else '0')
+ d.addCallback(build_keys)
+ return d
def gen_key(self, ktype):
"""
- Generate a key of type C{ktype} bound to the user's address.
+ Generate a key of type ktype bound to the user's address.
:param ktype: The type of the key.
- :type ktype: KeyType
+ :type ktype: subclass of EncryptionKey
+
+ :return: A Deferred which fires with the generated EncryptionKey.
+ :rtype: Deferred
- :return: The generated key.
- :rtype: EncryptionKey
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- signal(proto.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
- key = self._wrapper_map[ktype].gen_key(self._address)
- signal(proto.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
+ self._assert_supported_key_type(ktype)
+
+ def signal_finished(key):
+ emit(catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
+ return key
- return key
+ emit(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
+ d = self._wrapper_map[ktype].gen_key(self._address)
+ d.addCallback(signal_finished)
+ return d
#
# Setters/getters
@@ -407,67 +459,132 @@ class KeyManager(object):
# encrypt/decrypt and sign/verify API
#
- def encrypt(self, data, pubkey, passphrase=None, sign=None,
- cipher_algo='AES256'):
+ def encrypt(self, data, address, ktype, passphrase=None, sign=None,
+ cipher_algo='AES256', fetch_remote=True):
"""
- Encrypt C{data} using public @{key} and sign with C{sign} key.
+ Encrypt data with the public key bound to address and sign with with
+ the private key bound to sign address.
:param data: The data to be encrypted.
:type data: str
- :param pubkey: The key used to encrypt.
- :type pubkey: EncryptionKey
- :param sign: The key used for signing.
- :type sign: EncryptionKey
+ :param address: The address to encrypt it for.
+ :type address: str
+ :param ktype: The type of the key.
+ :type ktype: subclass of EncryptionKey
+ :param passphrase: The passphrase for the secret key used for the
+ signature.
+ :type passphrase: str
+ :param sign: The address to be used for signature.
+ :type sign: str
:param cipher_algo: The cipher algorithm to use.
:type cipher_algo: str
-
- :return: The encrypted data.
- :rtype: str
- """
- leap_assert_type(pubkey, EncryptionKey)
- leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
- leap_assert(pubkey.private is False, 'Key is not public.')
- return self._wrapper_map[pubkey.__class__].encrypt(
- data, pubkey, passphrase, sign)
-
- def decrypt(self, data, privkey, passphrase=None, verify=None):
- """
- Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+ :param fetch_remote: If key is not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
+
+ :return: A Deferred which fires with the encrypted data as str, or
+ which fails with KeyNotFound if no keys were found neither
+ locally or in keyserver or fails with EncryptError if failed
+ encrypting for some reason.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ self._assert_supported_key_type(ktype)
+
+ def encrypt(keys):
+ pubkey, signkey = keys
+ encrypted = self._wrapper_map[ktype].encrypt(
+ data, pubkey, passphrase, sign=signkey,
+ cipher_algo=cipher_algo)
+ pubkey.encr_used = True
+ d = self._wrapper_map[ktype].put_key(pubkey, address)
+ d.addCallback(lambda _: encrypted)
+ return d
+
+ dpub = self.get_key(address, ktype, private=False,
+ fetch_remote=fetch_remote)
+ dpriv = defer.succeed(None)
+ if sign is not None:
+ dpriv = self.get_key(sign, ktype, private=True)
+ d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
+ d.addCallbacks(encrypt, self._extract_first_error)
+ return d
+
+ def decrypt(self, data, address, ktype, passphrase=None, verify=None,
+ fetch_remote=True):
+ """
+ Decrypt data using private key from address and verify with public key
+ bound to verify address.
:param data: The data to be decrypted.
:type data: str
- :param privkey: The key used to decrypt.
- :type privkey: OpenPGPKey
+ :param address: The address to whom data was encrypted.
+ :type address: str
+ :param ktype: The type of the key.
+ :type ktype: subclass of EncryptionKey
:param passphrase: The passphrase for the secret key used for
decryption.
:type passphrase: str
- :param verify: The key used to verify a signature.
- :type verify: OpenPGPKey
-
- :return: The decrypted data.
- :rtype: str
-
- :raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
- """
- leap_assert_type(privkey, EncryptionKey)
- leap_assert(
- privkey.__class__ in self._wrapper_map,
- 'Unknown key type.')
- leap_assert(privkey.private is True, 'Key is not private.')
- return self._wrapper_map[privkey.__class__].decrypt(
- data, privkey, passphrase, verify)
-
- def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
+ :param verify: The address to be used for signature.
+ :type verify: str
+ :param fetch_remote: If key for verify not found in local storage try
+ to fetch from nickserver
+ :type fetch_remote: bool
+
+ :return: A Deferred which fires with:
+ * (decripted str, signing key) if validation works
+ * (decripted str, KeyNotFound) if signing key not found
+ * (decripted str, InvalidSignature) if signature is invalid
+ * KeyNotFound failure if private key not found
+ * DecryptError failure if decription failed
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ self._assert_supported_key_type(ktype)
+
+ def decrypt(keys):
+ pubkey, privkey = keys
+ decrypted, signed = self._wrapper_map[ktype].decrypt(
+ data, privkey, passphrase=passphrase, verify=pubkey)
+ if pubkey is None:
+ signature = KeyNotFound(verify)
+ elif signed:
+ pubkey.sign_used = True
+ d = self._wrapper_map[ktype].put_key(pubkey, address)
+ d.addCallback(lambda _: (decrypted, pubkey))
+ return d
+ else:
+ signature = InvalidSignature(
+ 'Failed to verify signature with key %s' %
+ (pubkey.key_id,))
+ return (decrypted, signature)
+
+ dpriv = self.get_key(address, ktype, private=True)
+ dpub = defer.succeed(None)
+ if verify is not None:
+ dpub = self.get_key(verify, ktype, private=False,
+ fetch_remote=fetch_remote)
+ dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)
+ d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
+ d.addCallbacks(decrypt, self._extract_first_error)
+ return d
+
+ def _extract_first_error(self, failure):
+ return failure.value.subFailure
+
+ def sign(self, data, address, ktype, digest_algo='SHA512', clearsign=False,
detach=True, binary=False):
"""
- Sign C{data} with C{privkey}.
+ Sign data with private key bound to address.
:param data: The data to be signed.
:type data: str
-
- :param privkey: The private key to be used to sign.
- :type privkey: EncryptionKey
+ :param address: The address to be used to sign.
+ :type address: EncryptionKey
+ :param ktype: The type of the key.
+ :type ktype: subclass of EncryptionKey
:param digest_algo: The hash digest to use.
:type digest_algo: str
:param clearsign: If True, create a cleartext signature.
@@ -477,82 +594,240 @@ class KeyManager(object):
:param binary: If True, do not ascii armour the output.
:type binary: bool
- :return: The signed data.
- :rtype: str
+ :return: A Deferred which fires with the signed data as str or fails
+ with KeyNotFound if no key was found neither locally or in
+ keyserver or fails with SignFailed if there was any error
+ signing.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- leap_assert_type(privkey, EncryptionKey)
- leap_assert(
- privkey.__class__ in self._wrapper_map,
- 'Unknown key type.')
- leap_assert(privkey.private is True, 'Key is not private.')
- return self._wrapper_map[privkey.__class__].sign(
- data, privkey, digest_algo=digest_algo, clearsign=clearsign,
- detach=detach, binary=binary)
+ self._assert_supported_key_type(ktype)
+
+ def sign(privkey):
+ return self._wrapper_map[ktype].sign(
+ data, privkey, digest_algo=digest_algo, clearsign=clearsign,
+ detach=detach, binary=binary)
- def verify(self, data, pubkey, detached_sig=None):
+ d = self.get_key(address, ktype, private=True)
+ d.addCallback(sign)
+ return d
+
+ def verify(self, data, address, ktype, detached_sig=None,
+ fetch_remote=True):
"""
- Verify signed C{data} with C{pubkey}, eventually using
- C{detached_sig}.
+ Verify signed data with private key bound to address, eventually using
+ detached_sig.
:param data: The data to be verified.
:type data: str
- :param pubkey: The public key to be used on verification.
- :type pubkey: EncryptionKey
+ :param address: The address to be used to verify.
+ :type address: EncryptionKey
+ :param ktype: The type of the key.
+ :type ktype: subclass of EncryptionKey
:param detached_sig: A detached signature. If given, C{data} is
verified using this detached signature.
:type detached_sig: str
+ :param fetch_remote: If key for verify not found in local storage try
+ to fetch from nickserver
+ :type fetch_remote: bool
+
+ :return: A Deferred which fires with the signing EncryptionKey if
+ signature verifies, or which fails with InvalidSignature if
+ signature don't verifies or fails with KeyNotFound if no key
+ was found neither locally or in keyserver.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ self._assert_supported_key_type(ktype)
+
+ def verify(pubkey):
+ signed = self._wrapper_map[ktype].verify(
+ data, pubkey, detached_sig=detached_sig)
+ if signed:
+ pubkey.sign_used = True
+ d = self._wrapper_map[ktype].put_key(pubkey, address)
+ d.addCallback(lambda _: pubkey)
+ return d
+ else:
+ raise InvalidSignature(
+ 'Failed to verify signature with key %s' %
+ (pubkey.key_id,))
+
+ d = self.get_key(address, ktype, private=False,
+ fetch_remote=fetch_remote)
+ d.addCallback(verify)
+ return d
- :return: The signed data.
- :rtype: str
+ def delete_key(self, key):
"""
- leap_assert_type(pubkey, EncryptionKey)
- leap_assert(pubkey.__class__ in self._wrapper_map, 'Unknown key type.')
- leap_assert(pubkey.private is False, 'Key is not public.')
- return self._wrapper_map[pubkey.__class__].verify(
- data, pubkey, detached_sig=detached_sig)
+ Remove key from storage.
- def parse_openpgp_ascii_key(self, key_data):
- """
- Parses an ascii armored key (or key pair) data and returns
- the OpenPGPKey keys.
+ :param key: The key to be removed.
+ :type key: EncryptionKey
- :param key_data: the key data to be parsed.
- :type key_data: str or unicode
+ :return: A Deferred which fires when the key is deleted, or which fails
+ KeyNotFound if the key was not found on local storage.
+ :rtype: Deferred
- :returns: the public key and private key (if applies) for that data.
- :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
- the tuple may have one or both components None
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- return self._wrapper_map[OpenPGPKey].parse_ascii_key(key_data)
+ self._assert_supported_key_type(type(key))
+ return self._wrapper_map[type(key)].delete_key(key)
- def delete_key(self, key):
+ def put_key(self, key, address):
"""
- Remove C{key} from storage.
+ Put key bound to address in local storage.
- May raise:
- openpgp.errors.KeyNotFound
- openpgp.errors.KeyAttributesDiffer
-
- :param key: The key to be removed.
+ :param key: The key to be stored
:type key: EncryptionKey
+ :param address: address for which this key will be active
+ :type address: str
+
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyAddressMismatch if address doesn't match
+ any uid on the key or fails with KeyNotValidUpdate if a key
+ with the same uid exists and the new one is not a valid update
+ for it.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ self._assert_supported_key_type(type(key))
+
+ if address not in key.address:
+ return defer.fail(
+ KeyAddressMismatch("UID %s found, but expected %s"
+ % (str(key.address), address)))
+
+ def old_key_not_found(failure):
+ if failure.check(KeyNotFound):
+ return None
+ else:
+ return failure
+
+ def check_upgrade(old_key):
+ if key.private or can_upgrade(key, old_key):
+ return self._wrapper_map[type(key)].put_key(key, address)
+ else:
+ raise KeyNotValidUpgrade(
+ "Key %s can not be upgraded by new key %s"
+ % (old_key.key_id, key.key_id))
+
+ d = self._wrapper_map[type(key)].get_key(address,
+ private=key.private)
+ d.addErrback(old_key_not_found)
+ d.addCallback(check_upgrade)
+ return d
+
+ def put_raw_key(self, key, ktype, address,
+ validation=ValidationLevels.Weak_Chain):
+ """
+ Put raw key bound to address in local storage.
+
+ :param key: The ascii key to be stored
+ :type key: str
+ :param ktype: the type of the key.
+ :type ktype: subclass of EncryptionKey
+ :param address: address for which this key will be active
+ :type address: str
+ :param validation: validation level for this key
+ (default: 'Weak_Chain')
+ :type validation: ValidationLevels
+
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyAddressMismatch if address doesn't match
+ any uid on the key or fails with KeyNotValidUpdate if a key
+ with the same uid exists and the new one is not a valid update
+ for it.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ self._assert_supported_key_type(ktype)
+ pubkey, privkey = self._wrapper_map[ktype].parse_ascii_key(key)
+ pubkey.validation = validation
+ d = self.put_key(pubkey, address)
+ if privkey is not None:
+ d.addCallback(lambda _: self.put_key(privkey, address))
+ return d
+
+ def fetch_key(self, address, uri, ktype,
+ validation=ValidationLevels.Weak_Chain):
+ """
+ Fetch a public key bound to address from the network and put it in
+ local storage.
+
+ :param address: The email address of the key.
+ :type address: str
+ :param uri: The URI of the key.
+ :type uri: str
+ :param ktype: the type of the key.
+ :type ktype: subclass of EncryptionKey
+ :param validation: validation level for this key
+ (default: 'Weak_Chain')
+ :type validation: ValidationLevels
+
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyNotFound: if not valid key on uri or fails
+ with KeyAddressMismatch if address doesn't match any uid on
+ the key or fails with KeyNotValidUpdate if a key with the same
+ uid exists and the new one is not a valid update for it.
+ :rtype: Deferred
+
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- try:
- self._wrapper_map[type(key)].delete_key(key)
- except IndexError as e:
- leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
+ self._assert_supported_key_type(ktype)
- def put_key(self, key):
+ res = self._get(uri)
+ if not res.ok:
+ return defer.fail(KeyNotFound(uri))
+
+ # XXX parse binary keys
+ pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
+ if pubkey is None:
+ return defer.fail(KeyNotFound(uri))
+
+ pubkey.validation = validation
+ return self.put_key(pubkey, address)
+
+ def _assert_supported_key_type(self, ktype):
"""
- Put C{key} in local storage.
+ Check if ktype is one of the supported key types
- :param key: The key to be stored.
- :type key: OpenPGPKey
+ :param ktype: the type of the key.
+ :type ktype: subclass of EncryptionKey
+
+ :raise UnsupportedKeyTypeError: if invalid key type
"""
- try:
- self._wrapper_map[type(key)].put_key(key)
- except IndexError as e:
- leap_assert(False, "Unsupported key type. Error {0!r}".format(e))
+ if ktype not in self._wrapper_map:
+ raise UnsupportedKeyTypeError(str(ktype))
-from ._version import get_versions
-__version__ = get_versions()['version']
-del get_versions
+
+def _split_email(address):
+ """
+ Split username and domain from an email address
+
+ :param address: an email address
+ :type address: str
+
+ :return: username and domain from the email address
+ :rtype: (str, str)
+ """
+ if address.count("@") != 1:
+ return None
+ return address.split("@")
+
+
+def _get_domain(url):
+ """
+ Get the domain from an url
+
+ :param url: an url
+ :type url: str
+
+ :return: the domain part of the url
+ :rtype: str
+ """
+ return urlparse(url).hostname
diff --git a/src/leap/keymanager/_version.py b/src/leap/keymanager/_version.py
index ad54645..ea862a7 100644
--- a/src/leap/keymanager/_version.py
+++ b/src/leap/keymanager/_version.py
@@ -1,13 +1,9 @@
-
-# This file was generated by the `freeze_debianver` command in setup.py
-# Using 'versioneer.py' (0.7+) from
-# revision-control system data, or from the parent directory name of an
-# unpacked source archive. Distribution tarballs contain a pre-generated copy
-# of this file.
+# This file helps to compute a version number in source trees obtained from
+# git-archive tarball (such as those provided by githubs download-from-tag
+# feature). Distribution tarballs (build by setup.py sdist) and build
+# directories (produced by setup.py build) will contain a much shorter file
+# that just contains the computed version number.
version_version = '0.3.8'
version_full = '0929d3953191e7ad5ec51f3e94ad6d7608428d48'
-
-def get_versions(default={}, verbose=False):
- return {'version': version_version, 'full': version_full}
diff --git a/src/leap/keymanager/errors.py b/src/leap/keymanager/errors.py
index ebe4fd5..8a9fb3c 100644
--- a/src/leap/keymanager/errors.py
+++ b/src/leap/keymanager/errors.py
@@ -96,3 +96,15 @@ class KeyFingerprintMismatch(Exception):
"""
A mismatch between fingerprints.
"""
+
+
+class KeyNotValidUpgrade(Exception):
+ """
+ Already existing key can not be upgraded with the new key
+ """
+
+
+class UnsupportedKeyTypeError(Exception):
+ """
+ Invalid key type
+ """
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index ec1bfeb..91559c2 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -25,11 +25,19 @@ try:
import simplejson as json
except ImportError:
import json # noqa
+import logging
import re
+import time
from abc import ABCMeta, abstractmethod
+from datetime import datetime
from leap.common.check import leap_assert
+from twisted.internet import defer
+
+from leap.keymanager.validation import ValidationLevels
+
+logger = logging.getLogger(__name__)
#
@@ -44,9 +52,11 @@ KEY_DATA_KEY = 'key_data'
KEY_PRIVATE_KEY = 'private'
KEY_LENGTH_KEY = 'length'
KEY_EXPIRY_DATE_KEY = 'expiry_date'
-KEY_FIRST_SEEN_AT_KEY = 'first_seen_at'
KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
+KEY_REFRESHED_AT_KEY = 'refreshed_at'
KEY_VALIDATION_KEY = 'validation'
+KEY_ENCR_USED_KEY = 'encr_used'
+KEY_SIGN_USED_KEY = 'sign_used'
KEY_TAGS_KEY = 'tags'
@@ -55,6 +65,8 @@ KEY_TAGS_KEY = 'tags'
#
KEYMANAGER_KEY_TAG = 'keymanager-key'
+KEYMANAGER_ACTIVE_TAG = 'keymanager-active'
+KEYMANAGER_ACTIVE_TYPE = '-active'
#
@@ -62,14 +74,20 @@ KEYMANAGER_KEY_TAG = 'keymanager-key'
#
TAGS_PRIVATE_INDEX = 'by-tags-private'
-TAGS_ADDRESS_PRIVATE_INDEX = 'by-tags-address-private'
+TYPE_ID_PRIVATE_INDEX = 'by-type-id-private'
+TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
INDEXES = {
TAGS_PRIVATE_INDEX: [
KEY_TAGS_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
],
- TAGS_ADDRESS_PRIVATE_INDEX: [
- KEY_TAGS_KEY,
+ TYPE_ID_PRIVATE_INDEX: [
+ KEY_TYPE_KEY,
+ KEY_ID_KEY,
+ 'bool(%s)' % KEY_PRIVATE_KEY,
+ ],
+ TYPE_ADDRESS_PRIVATE_INDEX: [
+ KEY_TYPE_KEY,
KEY_ADDRESS_KEY,
'bool(%s)' % KEY_PRIVATE_KEY,
]
@@ -92,34 +110,56 @@ def is_address(address):
return bool(re.match('[\w.-]+@[\w.-]+', address))
-def build_key_from_dict(kClass, address, kdict):
+def build_key_from_dict(kClass, kdict):
"""
- Build an C{kClass} key bound to C{address} based on info in C{kdict}.
+ Build an C{kClass} key based on info in C{kdict}.
- :param address: The address bound to the key.
- :type address: str
:param kdict: Dictionary with key data.
:type kdict: dict
:return: An instance of the key.
:rtype: C{kClass}
"""
- leap_assert(
- address == kdict[KEY_ADDRESS_KEY],
- 'Wrong address in key data.')
+ try:
+ validation = ValidationLevels.get(kdict[KEY_VALIDATION_KEY])
+ except ValueError:
+ logger.error("Not valid validation level (%s) for key %s",
+ (kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
+ validation = ValidationLevels.Weak_Chain
+
+ expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
+ last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
+ refreshed_at = _to_datetime(kdict[KEY_REFRESHED_AT_KEY])
+
return kClass(
- address,
+ kdict[KEY_ADDRESS_KEY],
key_id=kdict[KEY_ID_KEY],
fingerprint=kdict[KEY_FINGERPRINT_KEY],
key_data=kdict[KEY_DATA_KEY],
private=kdict[KEY_PRIVATE_KEY],
length=kdict[KEY_LENGTH_KEY],
- expiry_date=kdict[KEY_EXPIRY_DATE_KEY],
- first_seen_at=kdict[KEY_FIRST_SEEN_AT_KEY],
- last_audited_at=kdict[KEY_LAST_AUDITED_AT_KEY],
- validation=kdict[KEY_VALIDATION_KEY], # TODO: verify for validation.
+ expiry_date=expiry_date,
+ last_audited_at=last_audited_at,
+ refreshed_at=refreshed_at,
+ validation=validation,
+ encr_used=kdict[KEY_ENCR_USED_KEY],
+ sign_used=kdict[KEY_SIGN_USED_KEY],
)
+def _to_datetime(unix_time):
+ if unix_time != 0:
+ return datetime.fromtimestamp(unix_time)
+ else:
+ return None
+
+
+def _to_unix_time(date):
+ if date is not None:
+ return int(time.mktime(date.timetuple()))
+ else:
+ return 0
+
+
#
# Abstraction for encryption keys
#
@@ -129,23 +169,15 @@ class EncryptionKey(object):
Abstract class for encryption keys.
A key is "validated" if the nicknym agent has bound the user address to a
- public key. Nicknym supports three different levels of key validation:
-
- * Level 3 - path trusted: A path of cryptographic signatures can be traced
- from a trusted key to the key under evaluation. By default, only the
- provider key from the user's provider is a "trusted key".
- * level 2 - provider signed: The key has been signed by a provider key for
- the same domain, but the provider key is not validated using a trust
- path (i.e. it is only registered)
- * level 1 - registered: The key has been encountered and saved, it has no
- signatures (that are meaningful to the nicknym agent).
+ public key.
"""
__metaclass__ = ABCMeta
- def __init__(self, address, key_id=None, fingerprint=None,
- key_data=None, private=None, length=None, expiry_date=None,
- validation=None, first_seen_at=None, last_audited_at=None):
+ def __init__(self, address, key_id="", fingerprint="",
+ key_data="", private=False, length=0, expiry_date=None,
+ validation=ValidationLevels.Weak_Chain, last_audited_at=None,
+ refreshed_at=None, encr_used=False, sign_used=False):
self.address = address
self.key_id = key_id
self.fingerprint = fingerprint
@@ -154,8 +186,10 @@ class EncryptionKey(object):
self.length = length
self.expiry_date = expiry_date
self.validation = validation
- self.first_seen_at = first_seen_at
self.last_audited_at = last_audited_at
+ self.refreshed_at = refreshed_at
+ self.encr_used = encr_used
+ self.sign_used = sign_used
def get_json(self):
"""
@@ -164,21 +198,44 @@ class EncryptionKey(object):
:return: The JSON string describing this key.
:rtype: str
"""
+ expiry_date = _to_unix_time(self.expiry_date)
+ last_audited_at = _to_unix_time(self.last_audited_at)
+ refreshed_at = _to_unix_time(self.refreshed_at)
+
return json.dumps({
KEY_ADDRESS_KEY: self.address,
- KEY_TYPE_KEY: str(self.__class__),
+ KEY_TYPE_KEY: self.__class__.__name__,
KEY_ID_KEY: self.key_id,
KEY_FINGERPRINT_KEY: self.fingerprint,
KEY_DATA_KEY: self.key_data,
KEY_PRIVATE_KEY: self.private,
KEY_LENGTH_KEY: self.length,
- KEY_EXPIRY_DATE_KEY: self.expiry_date,
- KEY_VALIDATION_KEY: self.validation,
- KEY_FIRST_SEEN_AT_KEY: self.first_seen_at,
- KEY_LAST_AUDITED_AT_KEY: self.last_audited_at,
+ KEY_EXPIRY_DATE_KEY: expiry_date,
+ KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ KEY_REFRESHED_AT_KEY: refreshed_at,
+ KEY_VALIDATION_KEY: str(self.validation),
+ KEY_ENCR_USED_KEY: self.encr_used,
+ KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
})
+ def get_active_json(self, address):
+ """
+ Return a JSON string describing this key.
+
+ :param address: Address for wich the key is active
+ :type address: str
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ return json.dumps({
+ KEY_ADDRESS_KEY: address,
+ KEY_TYPE_KEY: self.__class__.__name__ + KEYMANAGER_ACTIVE_TYPE,
+ KEY_ID_KEY: self.key_id,
+ KEY_PRIVATE_KEY: self.private,
+ KEY_TAGS_KEY: [KEYMANAGER_ACTIVE_TAG],
+ })
+
def __repr__(self):
"""
Representation of this class
@@ -221,21 +278,61 @@ class EncryptionScheme(object):
"""
leap_assert(self._soledad is not None,
"Cannot init indexes with null soledad")
- # Ask the database for currently existing indexes.
- db_indexes = dict(self._soledad.list_indexes())
- # Loop through the indexes we expect to find.
- for name, expression in INDEXES.items():
- if name not in db_indexes:
- # The index does not yet exist.
- self._soledad.create_index(name, *expression)
- continue
- if expression == db_indexes[name]:
- # The index exists and is up to date.
- continue
- # The index exists but the definition is not what expected, so we
- # delete it and add the proper index expression.
- self._soledad.delete_index(name)
- self._soledad.create_index(name, *expression)
+
+ def init_idexes(indexes):
+ deferreds = []
+ db_indexes = dict(indexes)
+ # Loop through the indexes we expect to find.
+ for name, expression in INDEXES.items():
+ if name not in db_indexes:
+ # The index does not yet exist.
+ d = self._soledad.create_index(name, *expression)
+ deferreds.append(d)
+ elif expression != db_indexes[name]:
+ # The index exists but the definition is not what expected,
+ # so we delete it and add the proper index expression.
+ d = self._soledad.delete_index(name)
+ d.addCallback(
+ lambda _:
+ self._soledad.create_index(name, *expression))
+ deferreds.append(d)
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ self.deferred_indexes = self._soledad.list_indexes()
+ self.deferred_indexes.addCallback(init_idexes)
+
+ def _wait_indexes(self, *methods):
+ """
+ Methods that need to wait for the indexes to be ready.
+
+ Heavily based on
+ http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/
+
+ :param methods: methods that need to wait for the indexes to be ready
+ :type methods: tuple(str)
+ """
+ self.waiting = []
+ self.stored = {}
+
+ def restore(_):
+ for method in self.stored:
+ setattr(self, method, self.stored[method])
+ for d in self.waiting:
+ d.callback(None)
+
+ def makeWrapper(method):
+ def wrapper(*args, **kw):
+ d = defer.Deferred()
+ d.addCallback(lambda _: self.stored[method](*args, **kw))
+ self.waiting.append(d)
+ return d
+ return wrapper
+
+ for method in methods:
+ self.stored[method] = getattr(self, method)
+ setattr(self, method, makeWrapper(method))
+
+ self.deferred_indexes.addCallback(restore)
@abstractmethod
def get_key(self, address, private=False):
@@ -247,19 +344,25 @@ class EncryptionScheme(object):
:param private: Look for a private key instead of a public one?
:type private: bool
- :return: The key bound to C{address}.
- :rtype: EncryptionKey
- @raise KeyNotFound: If the key was not found on local storage.
+ :return: A Deferred which fires with the EncryptionKey bound to
+ address, or which fails with KeyNotFound if the key was not
+ found on local storage.
+ :rtype: Deferred
"""
pass
@abstractmethod
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put a key in local storage.
:param key: The key to be stored.
:type key: EncryptionKey
+ :param address: address for which this key will be active.
+ :type address: str
+
+ :return: A Deferred which fires when the key is in the storage.
+ :rtype: Deferred
"""
pass
@@ -283,6 +386,11 @@ class EncryptionScheme(object):
:param key: The key to be removed.
:type key: EncryptionKey
+
+ :return: A Deferred which fires when the key is deleted, or which
+ fails with KeyNotFound if the key was not found on local
+ storage.
+ :rtype: Deferred
"""
pass
@@ -315,11 +423,10 @@ class EncryptionScheme(object):
:param verify: The key used to verify a signature.
:type verify: OpenPGPKey
- :return: The decrypted data.
- :rtype: str
+ :return: The decrypted data and if signature verifies
+ :rtype: (unicode, bool)
- @raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
+ :raise DecryptError: Raised if failed decrypting for some reason.
"""
pass
@@ -353,7 +460,7 @@ class EncryptionScheme(object):
verified against this sdetached signature.
:type detached_sig: str
- :return: The signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
pass
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 950d022..794a0ec 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -22,12 +22,13 @@ import os
import re
import shutil
import tempfile
+import io
-from contextlib import closing
+from datetime import datetime
from gnupg import GPG
from gnupg.gnupg import GPGUtilities
-from gnupg._util import _make_binary_stream
+from twisted.internet import defer
from leap.common.check import leap_assert, leap_assert_type, leap_check
from leap.keymanager import errors
@@ -36,8 +37,11 @@ from leap.keymanager.keys import (
EncryptionScheme,
is_address,
build_key_from_dict,
- KEYMANAGER_KEY_TAG,
- TAGS_ADDRESS_PRIVATE_INDEX,
+ TYPE_ID_PRIVATE_INDEX,
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ KEY_ADDRESS_KEY,
+ KEY_ID_KEY,
+ KEYMANAGER_ACTIVE_TYPE,
)
@@ -106,9 +110,9 @@ class TempGPGWrapper(object):
# itself is enough to also have the public key in the keyring,
# and we want to count the keys afterwards.
- privaddrs = map(lambda privkey: privkey.address, privkeys)
+ privids = map(lambda privkey: privkey.key_id, privkeys)
publkeys = filter(
- lambda pubkey: pubkey.address not in privaddrs, publkeys)
+ lambda pubkey: pubkey.key_id not in privids, publkeys)
listkeys = lambda: self._gpg.list_keys()
listsecretkeys = lambda: self._gpg.list_keys(secret=True)
@@ -159,33 +163,20 @@ class TempGPGWrapper(object):
shutil.rmtree(self._gpg.homedir)
-def _build_key_from_gpg(address, key, key_data):
+def _parse_address(address):
"""
- Build an OpenPGPKey for C{address} based on C{key} from
- local gpg storage.
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: test_user+something@provider.com becomes test_user@provider.com
+ since the key belongs to the identity without the '+' suffix.
- ASCII armored GPG key data has to be queried independently in this
- wrapper, so we receive it in C{key_data}.
-
- :param address: The address bound to the key.
:type address: str
- :param key: Key obtained from GPG storage.
- :type key: dict
- :param key_data: Key data obtained from GPG storage.
- :type key_data: str
- :return: An instance of the key.
- :rtype: OpenPGPKey
+ :rtype: str
"""
- return OpenPGPKey(
- address,
- key_id=key['keyid'],
- fingerprint=key['fingerprint'],
- key_data=key_data,
- private=True if key['type'] == 'sec' else False,
- length=key['length'],
- expiry_date=key['expires'],
- validation=None, # TODO: verify for validation.
- )
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(match.group(2, 4))
#
@@ -197,6 +188,26 @@ class OpenPGPKey(EncryptionKey):
Base class for OpenPGP keys.
"""
+ def __init__(self, address, gpgbinary=None, **kwargs):
+ self._gpgbinary = gpgbinary
+ super(OpenPGPKey, self).__init__(address, **kwargs)
+
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.key_id)
+ for uid, sigs in res.sigs.iteritems():
+ if _parse_address(uid) in self.address:
+ return sigs
+
+ return []
+
class OpenPGPScheme(EncryptionScheme):
"""
@@ -204,6 +215,10 @@ class OpenPGPScheme(EncryptionScheme):
signing and verification).
"""
+ # type used on the soledad documents
+ KEY_TYPE = OpenPGPKey.__name__
+ ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE
+
def __init__(self, soledad, gpgbinary=None):
"""
Initialize the OpenPGP wrapper.
@@ -214,6 +229,7 @@ class OpenPGPScheme(EncryptionScheme):
:type gpgbinary: C{str}
"""
EncryptionScheme.__init__(self, soledad)
+ self._wait_indexes("get_key", "put_key")
self._gpgbinary = gpgbinary
#
@@ -226,57 +242,63 @@ class OpenPGPScheme(EncryptionScheme):
:param address: The address bound to the key.
:type address: str
- :return: The key bound to C{address}.
- :rtype: OpenPGPKey
- @raise KeyAlreadyExists: If key already exists in local database.
+
+ :return: A Deferred which fires with the key bound to address, or fails
+ with KeyAlreadyExists if key already exists in local database.
+ :rtype: Deferred
"""
# make sure the key does not already exist
leap_assert(is_address(address), 'Not an user address: %s' % address)
- try:
- self.get_key(address)
- raise errors.KeyAlreadyExists(address)
- except errors.KeyNotFound:
- logger.debug('Key for %s not found' % (address,))
- with self._temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- params = gpg.gen_key_input(
- key_type='RSA',
- key_length=4096,
- name_real=address,
- name_email=address,
- name_comment='')
- logger.info("About to generate keys... This might take SOME time.")
- gpg.gen_key(params)
- logger.info("Keys for %s have been successfully "
- "generated." % (address,))
- pubkeys = gpg.list_keys()
-
- # assert for new key characteristics
-
- # XXX This exception is not properly catched by the soledad
- # bootstrapping, so if we do not finish generating the keys
- # we end with a blocked thread -- kali
-
- leap_assert(
- len(pubkeys) is 1, # a unitary keyring!
- 'Keyring has wrong number of keys: %d.' % len(pubkeys))
- key = gpg.list_keys(secret=True).pop()
- leap_assert(
- len(key['uids']) is 1, # with just one uid!
- 'Wrong number of uids for key: %d.' % len(key['uids']))
- leap_assert(
- re.match('.*<%s>$' % address, key['uids'][0]) is not None,
- 'Key not correctly bound to address.')
- # insert both public and private keys in storage
- for secret in [True, False]:
- key = gpg.list_keys(secret=secret).pop()
- openpgp_key = _build_key_from_gpg(
- address, key,
- gpg.export_keys(key['fingerprint'], secret=secret))
- self.put_key(openpgp_key)
+ def _gen_key(_):
+ with self._temporary_gpgwrapper() as gpg:
+ # TODO: inspect result, or use decorator
+ params = gpg.gen_key_input(
+ key_type='RSA',
+ key_length=4096,
+ name_real=address,
+ name_email=address,
+ name_comment='')
+ logger.info("About to generate keys... "
+ "This might take SOME time.")
+ gpg.gen_key(params)
+ logger.info("Keys for %s have been successfully "
+ "generated." % (address,))
+ pubkeys = gpg.list_keys()
+
+ # assert for new key characteristics
+ leap_assert(
+ len(pubkeys) is 1, # a unitary keyring!
+ 'Keyring has wrong number of keys: %d.' % len(pubkeys))
+ key = gpg.list_keys(secret=True).pop()
+ leap_assert(
+ len(key['uids']) is 1, # with just one uid!
+ 'Wrong number of uids for key: %d.' % len(key['uids']))
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ break
+ leap_assert(uid_match, 'Key not correctly bound to address.')
+
+ # insert both public and private keys in storage
+ deferreds = []
+ for secret in [True, False]:
+ key = gpg.list_keys(secret=secret).pop()
+ openpgp_key = self._build_key_from_gpg(
+ key,
+ gpg.export_keys(key['fingerprint'], secret=secret))
+ d = self.put_key(openpgp_key, address)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+ def key_already_exists(_):
+ raise errors.KeyAlreadyExists(address)
- return self.get_key(address, private=True)
+ d = self.get_key(address)
+ d.addCallbacks(key_already_exists, _gen_key)
+ d.addCallback(lambda _: self.get_key(address, private=True))
+ return d
def get_key(self, address, private=False):
"""
@@ -287,19 +309,26 @@ class OpenPGPScheme(EncryptionScheme):
:param private: Look for a private key instead of a public one?
:type private: bool
- :return: The key bound to C{address}.
- :rtype: OpenPGPKey
- @raise KeyNotFound: If the key was not found on local storage.
+ :return: A Deferred which fires with the OpenPGPKey bound to address,
+ or which fails with KeyNotFound if the key was not found on
+ local storage.
+ :rtype: Deferred
"""
- # Remove the identity suffix after the '+' until the '@'
- # e.g.: test_user+something@provider.com becomes test_user@provider.com
- # since the key belongs to the identity without the '+' suffix.
- address = re.sub(r'\+.*\@', '@', address)
+ address = _parse_address(address)
- doc = self._get_key_doc(address, private)
- if doc is None:
- raise errors.KeyNotFound(address)
- return build_key_from_dict(OpenPGPKey, address, doc.content)
+ def build_key(doc):
+ if doc is None:
+ raise errors.KeyNotFound(address)
+ leap_assert(
+ address in doc.content[KEY_ADDRESS_KEY],
+ 'Wrong address in key data.')
+ key = build_key_from_dict(OpenPGPKey, doc.content)
+ key._gpgbinary = self._gpgbinary
+ return key
+
+ d = self._get_key_doc(address, private)
+ d.addCallback(build_key)
+ return d
def parse_ascii_key(self, key_data):
"""
@@ -316,7 +345,6 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(key_data, (str, unicode))
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- mail_regex = '.*<([\w.-]+@[\w.-]+)>.*'
with self._temporary_gpgwrapper() as gpg:
# TODO: inspect result, or use decorator
@@ -328,44 +356,39 @@ class OpenPGPScheme(EncryptionScheme):
privkey = gpg.list_keys(secret=True).pop()
except IndexError:
pass
- pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
-
- # extract adress from first uid on key
- match = re.match(mail_regex, pubkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- address = match.group(1)
+ try:
+ pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
+ except IndexError:
+ return (None, None)
openpgp_privkey = None
if privkey is not None:
- match = re.match(mail_regex, privkey['uids'].pop())
- leap_assert(match is not None, 'No user address in key data.')
- privaddress = match.group(1)
-
# build private key
- openpgp_privkey = _build_key_from_gpg(
- privaddress, privkey,
+ openpgp_privkey = self._build_key_from_gpg(
+ privkey,
gpg.export_keys(privkey['fingerprint'], secret=True))
-
- leap_check(address == privaddress,
- 'Addresses in public and private key differ.',
- errors.KeyAddressMismatch)
leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
'Fingerprints for public and private key differ.',
errors.KeyFingerprintMismatch)
# build public key
- openpgp_pubkey = _build_key_from_gpg(
- address, pubkey,
+ openpgp_pubkey = self._build_key_from_gpg(
+ pubkey,
gpg.export_keys(pubkey['fingerprint'], secret=False))
return (openpgp_pubkey, openpgp_privkey)
- def put_ascii_key(self, key_data):
+ def put_ascii_key(self, key_data, address):
"""
Put key contained in ascii-armored C{key_data} in local storage.
:param key_data: The key data to be stored.
:type key_data: str or unicode
+ :param address: address for which this key will be active
+ :type address: str
+
+ :return: A Deferred which fires when the OpenPGPKey is in the storage.
+ :rtype: Deferred
"""
leap_assert_type(key_data, (str, unicode))
@@ -373,26 +396,124 @@ class OpenPGPScheme(EncryptionScheme):
try:
openpgp_pubkey, openpgp_privkey = self.parse_ascii_key(key_data)
except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
- leap_assert(False, repr(e))
+ return defer.fail(e)
+
+ def put_key(_, key):
+ return self.put_key(key, address)
+ d = defer.succeed(None)
if openpgp_pubkey is not None:
- self.put_key(openpgp_pubkey)
+ d.addCallback(put_key, openpgp_pubkey)
if openpgp_privkey is not None:
- self.put_key(openpgp_privkey)
+ d.addCallback(put_key, openpgp_privkey)
+ return d
- def put_key(self, key):
+ def put_key(self, key, address):
"""
Put C{key} in local storage.
:param key: The key to be stored.
:type key: OpenPGPKey
+ :param address: address for which this key will be active.
+ :type address: str
+
+ :return: A Deferred which fires when the key is in the storage.
+ :rtype: Deferred
+ """
+ d = self._put_key_doc(key)
+ d.addCallback(lambda _: self._put_active_doc(key, address))
+ return d
+
+ def _put_key_doc(self, key):
"""
- doc = self._get_key_doc(key.address, private=key.private)
- if doc is None:
- self._soledad.create_doc_from_json(key.get_json())
- else:
- doc.set_json(key.get_json())
- self._soledad.put_doc(doc)
+ Put key document in soledad
+
+ :type key: OpenPGPKey
+ :rtype: Deferred
+ """
+ def check_and_put(docs, key):
+ if len(docs) == 1:
+ doc = docs.pop()
+ oldkey = build_key_from_dict(OpenPGPKey, doc.content)
+ if key.fingerprint == oldkey.fingerprint:
+ # in case of an update of the key merge them with gnupg
+ with self._temporary_gpgwrapper() as gpg:
+ gpg.import_keys(oldkey.key_data)
+ gpg.import_keys(key.key_data)
+ gpgkey = gpg.list_keys(secret=key.private).pop()
+ mergedkey = self._build_key_from_gpg(
+ gpgkey,
+ gpg.export_keys(gpgkey['fingerprint'],
+ secret=key.private))
+ mergedkey.validation = max(
+ [key.validation, oldkey.validation])
+ mergedkey.last_audited_at = oldkey.last_audited_at
+ mergedkey.refreshed_at = key.refreshed_at
+ mergedkey.encr_used = key.encr_used or oldkey.encr_used
+ mergedkey.sign_used = key.sign_used or oldkey.sign_used
+ doc.set_json(mergedkey.get_json())
+ d = self._soledad.put_doc(doc)
+ else:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (key.fingerprint, oldkey.fingerprint))
+ d = defer.fail(
+ errors.KeyFingerprintMismatch(key.fingerprint))
+ elif len(docs) > 1:
+ logger.critical(
+ "There is more than one key with the same key_id %s"
+ % (key.key_id,))
+ d = defer.fail(errors.KeyAttributesDiffer(key.key_id))
+ else:
+ d = self._soledad.create_doc_from_json(key.get_json())
+ return d
+
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ d.addCallback(check_and_put, key)
+ return d
+
+ def _put_active_doc(self, key, address):
+ """
+ Put active key document in soledad
+
+ :type key: OpenPGPKey
+ :type addresses: str
+ :rtype: Deferred
+ """
+ def check_and_put(docs):
+ if len(docs) == 1:
+ doc = docs.pop()
+ doc.set_json(key.get_active_json(address))
+ d = self._soledad.put_doc(doc)
+ else:
+ if len(docs) > 1:
+ logger.error("There is more than one active key document "
+ "for the address %s" % (address,))
+ deferreds = []
+ for doc in docs:
+ delete = self._soledad.delete_doc(doc)
+ deferreds.append(delete)
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ else:
+ d = defer.succeed(None)
+
+ d.addCallback(
+ lambda _: self._soledad.create_doc_from_json(
+ key.get_active_json(address)))
+ return d
+
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ address,
+ '1' if key.private else '0')
+ d.addCallback(check_and_put)
+ return d
def _get_key_doc(self, address, private=False):
"""
@@ -404,41 +525,128 @@ class OpenPGPScheme(EncryptionScheme):
:type address: str
:param private: Whether to look for a private key.
:type private: bool
- :return: The document with the key or None if it does not exist.
- :rtype: leap.soledad.document.SoledadDocument
+
+ :return: A Deferred which fires with the SoledadDocument with the key
+ or None if it does not exist.
+ :rtype: Deferred
"""
- doclist = self._soledad.get_from_index(
- TAGS_ADDRESS_PRIVATE_INDEX,
- KEYMANAGER_KEY_TAG,
+ def get_key_from_active_doc(activedoc):
+ if len(activedoc) is 0:
+ return None
+ leap_assert(
+ len(activedoc) is 1,
+ 'Found more than one key for address %s!' % (address,))
+
+ key_id = activedoc[0].content[KEY_ID_KEY]
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key_id,
+ '1' if private else '0')
+ d.addCallback(get_doc, key_id)
+ return d
+
+ def get_doc(doclist, key_id):
+ leap_assert(
+ len(doclist) is 1,
+ 'There is %d keys for id %s!' % (len(doclist), key_id))
+ return doclist.pop()
+
+ d = self._soledad.get_from_index(
+ TYPE_ADDRESS_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
address,
'1' if private else '0')
- if len(doclist) is 0:
- return None
- leap_assert(
- len(doclist) is 1,
- 'Found more than one %s key for address!' %
- 'private' if private else 'public')
- return doclist.pop()
+ d.addCallback(get_key_from_active_doc)
+ return d
+
+ def _build_key_from_gpg(self, key, key_data):
+ """
+ Build an OpenPGPKey for C{address} based on C{key} from
+ local gpg storage.
+
+ ASCII armored GPG key data has to be queried independently in this
+ wrapper, so we receive it in C{key_data}.
+
+ :param key: Key obtained from GPG storage.
+ :type key: dict
+ :param key_data: Key data obtained from GPG storage.
+ :type key_data: str
+ :return: An instance of the key.
+ :rtype: OpenPGPKey
+ """
+ expiry_date = None
+ if key['expires']:
+ expiry_date = datetime.fromtimestamp(int(key['expires']))
+ address = []
+ for uid in key['uids']:
+ address.append(_parse_address(uid))
+
+ return OpenPGPKey(
+ address,
+ gpgbinary=self._gpgbinary,
+ key_id=key['keyid'],
+ fingerprint=key['fingerprint'],
+ key_data=key_data,
+ private=True if key['type'] == 'sec' else False,
+ length=int(key['length']),
+ expiry_date=expiry_date,
+ refreshed_at=datetime.now(),
+ )
def delete_key(self, key):
"""
Remove C{key} from storage.
- May raise:
- errors.KeyNotFound
- errors.KeyAttributesDiffer
-
:param key: The key to be removed.
:type key: EncryptionKey
+
+ :return: A Deferred which fires when the key is deleted, or which
+ fails with KeyNotFound if the key was not found on local
+ storage.
+ :rtype: Deferred
"""
leap_assert_type(key, OpenPGPKey)
- stored_key = self.get_key(key.address, private=key.private)
- if stored_key is None:
- raise errors.KeyNotFound(key)
- if stored_key.__dict__ != key.__dict__:
- raise errors.KeyAttributesDiffer(key)
- doc = self._get_key_doc(key.address, key.private)
- self._soledad.delete_doc(doc)
+
+ def delete_docs(activedocs):
+ deferreds = []
+ for doc in activedocs:
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+
+ def get_key_docs(_):
+ return self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.KEY_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+
+ def delete_key(docs):
+ if len(docs) == 0:
+ raise errors.KeyNotFound(key)
+ if len(docs) > 1:
+ logger.critical("There is more than one key for key_id %s"
+ % key.key_id)
+
+ doc = None
+ for d in docs:
+ if d.content['fingerprint'] == key.fingerprint:
+ doc = d
+ break
+ if doc is None:
+ raise errors.KeyNotFound(key)
+ return self._soledad.delete_doc(doc)
+
+ d = self._soledad.get_from_index(
+ TYPE_ID_PRIVATE_INDEX,
+ self.ACTIVE_TYPE,
+ key.key_id,
+ '1' if key.private else '0')
+ d.addCallback(delete_docs)
+ d.addCallback(get_key_docs)
+ d.addCallback(delete_key)
+ return d
#
# Data encryption, decryption, signing and verifying
@@ -449,10 +657,8 @@ class OpenPGPScheme(EncryptionScheme):
Return a gpg wrapper that implements the context manager protocol and
contains C{keys}.
- :param key_data: ASCII armored key data.
- :type key_data: str
- :param gpgbinary: Name for GnuPG binary executable.
- :type gpgbinary: C{str}
+ :param keys: keys to conform the keyring.
+ :type key: list(OpenPGPKey)
:return: a TempGPGWrapper instance
:rtype: TempGPGWrapper
@@ -536,12 +742,10 @@ class OpenPGPScheme(EncryptionScheme):
:param verify: The key used to verify a signature.
:type verify: OpenPGPKey
- :return: The decrypted data.
- :rtype: unicode
+ :return: The decrypted data and if signature verifies
+ :rtype: (unicode, bool)
:raise DecryptError: Raised if failed decrypting for some reason.
- :raise InvalidSignature: Raised if unable to verify the signature with
- C{verify} key.
"""
leap_assert(privkey.private is True, 'Key is not private.')
keys = [privkey]
@@ -554,15 +758,15 @@ class OpenPGPScheme(EncryptionScheme):
result = gpg.decrypt(
data, passphrase=passphrase, always_trust=True)
self._assert_gpg_result_ok(result)
+
# verify signature
- if (verify is not None):
- if result.valid is False or \
- verify.fingerprint != result.pubkey_fingerprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature with key %s: %s' %
- (verify.key_id, result.stderr))
+ sign_valid = False
+ if (verify is not None and
+ result.valid is True and
+ verify.fingerprint == result.pubkey_fingerprint):
+ sign_valid = True
- return result.data
+ return (result.data, sign_valid)
except errors.GPGError as e:
logger.error('Failed to decrypt: %s.' % str(e))
raise errors.DecryptError(str(e))
@@ -638,8 +842,8 @@ class OpenPGPScheme(EncryptionScheme):
verified against this detached signature.
:type detached_sig: str
- :return: The ascii-armored signed data.
- :rtype: str
+ :return: signature matches
+ :rtype: bool
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)
@@ -654,15 +858,10 @@ class OpenPGPScheme(EncryptionScheme):
sf, sfname = tempfile.mkstemp()
with os.fdopen(sf, 'w') as sfd:
sfd.write(detached_sig)
- with closing(_make_binary_stream(data, gpg._encoding)) as df:
- result = gpg.verify_file(df, sig_file=sfname)
+ result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
+ os.unlink(sfname)
gpgpubkey = gpg.list_keys().pop()
valid = result.valid
rfprint = result.fingerprint
kfprint = gpgpubkey['fingerprint']
- # raise in case sig is invalid
- if valid is False or rfprint != kfprint:
- raise errors.InvalidSignature(
- 'Failed to verify signature '
- 'with key %s.' % gpgpubkey['keyid'])
- return True
+ return valid and rfprint == kfprint
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index e69de29..7128d20 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -0,0 +1,310 @@
+# -*- coding: utf-8 -*-
+# test_keymanager.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Base classes for the Key Manager tests.
+"""
+
+import distutils.spawn
+import os.path
+
+from twisted.internet.defer import gatherResults
+from twisted.trial import unittest
+
+from leap.common.testing.basetest import BaseLeapTest
+from leap.soledad.client import Soledad
+from leap.keymanager import KeyManager
+from leap.keymanager.openpgp import OpenPGPKey
+
+
+ADDRESS = 'leap@leap.se'
+ADDRESS_2 = 'anotheruser@leap.se'
+
+
+class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
+
+ def setUp(self):
+ self.setUpEnv()
+ self.gpg_binary_path = self._find_gpg()
+
+ self._soledad = Soledad(
+ u"leap@leap.se",
+ u"123456",
+ secrets_path=self.tempdir + "/secret.gpg",
+ local_db_path=self.tempdir + "/soledad.u1db",
+ server_url='',
+ cert_file=None,
+ auth_token=None,
+ syncable=False
+ )
+
+ def tearDown(self):
+ km = self._key_manager()
+
+ def delete_keys(keys):
+ deferreds = []
+ for key in keys:
+ d = km._wrapper_map[key.__class__].delete_key(key)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+
+ def get_and_delete_keys(_):
+ deferreds = []
+ for private in [True, False]:
+ d = km.get_all_keys(private=private)
+ d.addCallback(delete_keys)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+
+ # wait for the indexes to be ready for the tear down
+ d = km._wrapper_map[OpenPGPKey].deferred_indexes
+ d.addCallback(get_and_delete_keys)
+ d.addCallback(lambda _: self.tearDownEnv())
+ return d
+
+ def _key_manager(self, user=ADDRESS, url='', token=None):
+ return KeyManager(user, url, self._soledad, token=token,
+ gpgbinary=self.gpg_binary_path)
+
+ def _find_gpg(self):
+ gpg_path = distutils.spawn.find_executable('gpg')
+ if gpg_path is not None:
+ return os.path.realpath(gpg_path)
+ else:
+ return "/usr/bin/gpg"
+
+
+# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
+KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
+PUBLIC_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+mQINBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
+iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
+zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
+irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
+huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
+d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
+wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
+hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
+U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
+T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
+Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
+tBxMZWFwIFRlc3QgS2V5IDxsZWFwQGxlYXAuc2U+iQI3BBMBCAAhBQJQvfnZAhsD
+BQsJCAcDBRUKCQgLBRYCAwEAAh4BAheAAAoJEC9FXigk0Y3fT7EQAKH3IuRniOpb
+T/DDIgwwjz3oxB/W0DDMyPXowlhSOuM0rgGfntBpBb3boezEXwL86NPQxNGGruF5
+hkmecSiuPSvOmQlqlS95NGQp6hNG0YaKColh+Q5NTspFXCAkFch9oqUje0LdxfSP
+QfV9UpeEvGyPmk1I9EJV/YDmZ4+Djge1d7qhVZInz4Rx1NrSyF/Tc2EC0VpjQFsU
+Y9Kb2YBBR7ivG6DBc8ty0jJXi7B4WjkFcUEJviQpMF2dCLdonCehYs1PqsN1N7j+
+eFjQd+hqVMJgYuSGKjvuAEfClM6MQw7+FmFwMyLgK/Ew/DttHEDCri77SPSkOGSI
+txCzhTg6798f6mJr7WcXmHX1w1Vcib5FfZ8vTDFVhz/XgAgArdhPo9V6/1dgSSiB
+KPQ/spsco6u5imdOhckERE0lnAYvVT6KE81TKuhF/b23u7x+Wdew6kK0EQhYA7wy
+7LmlaNXc7rMBQJ9Z60CJ4JDtatBWZ0kNrt2VfdDHVdqBTOpl0CraNUjWE5YMDasr
+K2dF5IX8D3uuYtpZnxqg0KzyLg0tzL0tvOL1C2iudgZUISZNPKbS0z0v+afuAAnx
+2pTC3uezbh2Jt8SWTLhll4i0P4Ps5kZ6HQUO56O+/Z1cWovX+mQekYFmERySDR9n
+3k1uAwLilJmRmepGmvYbB8HloV8HqwgguQINBFC9+dkBEAC0I/xn1uborMgDvBtf
+H0sEhwnXBC849/32zic6udB6/3Efk9nzbSpL3FSOuXITZsZgCHPkKarnoQ2ztMcS
+sh1ke1C5gQGms75UVmM/nS+2YI4vY8OX/GC/on2vUyncqdH+bR6xH5hx4NbWpfTs
+iQHmz5C6zzS/kuabGdZyKRaZHt23WQ7JX/4zpjqbC99DjHcP9BSk7tJ8wI4bkMYD
+uFVQdT9O6HwyKGYwUU4sAQRAj7XCTGvVbT0dpgJwH4RmrEtJoHAx4Whg8mJ710E0
+GCmzf2jqkNuOw76ivgk27Kge+Hw00jmJjQhHY0yVbiaoJwcRrPKzaSjEVNgrpgP3
+lXPRGQArgESsIOTeVVHQ8fhK2YtTeCY9rIiO+L0OX2xo9HK7hfHZZWL6rqymXdyS
+fhzh/f6IPyHFWnvj7Brl7DR8heMikygcJqv+ed2yx7iLyCUJ10g12I48+aEj1aLe
+dP7lna32iY8/Z0SHQLNH6PXO9SlPcq2aFUgKqE75A/0FMk7CunzU1OWr2ZtTLNO1
+WT/13LfOhhuEq9jTyTosn0WxBjJKq18lnhzCXlaw6EAtbA7CUwsD3CTPR56aAXFK
+3I7KXOVAqggrvMe5Tpdg5drfYpI8hZovL5aAgb+7Y5ta10TcJdUhS5K3kFAWe/td
+U0cmWUMDP1UMSQ5Jg6JIQVWhSwARAQABiQIfBBgBCAAJBQJQvfnZAhsMAAoJEC9F
+Xigk0Y3fRwsP/i0ElYCyxeLpWJTwo1iCLkMKz2yX1lFVa9nT1BVTPOQwr/IAc5OX
+NdtbJ14fUsKL5pWgW8OmrXtwZm1y4euI1RPWWubG01ouzwnGzv26UcuHeqC5orZj
+cOnKtL40y8VGMm8LoicVkRJH8blPORCnaLjdOtmA3rx/v2EXrJpSa3AhOy0ZSRXk
+ZSrK68AVNwamHRoBSYyo0AtaXnkPX4+tmO8X8BPfj125IljubvwZPIW9VWR9UqCE
+VPfDR1XKegVb6VStIywF7kmrknM1C5qUY28rdZYWgKorw01hBGV4jTW0cqde3N51
+XT1jnIAa+NoXUM9uQoGYMiwrL7vNsLlyyiW5ayDyV92H/rIuiqhFgbJsHTlsm7I8
+oGheR784BagAA1NIKD1qEO9T6Kz9lzlDaeWS5AUKeXrb7ZJLI1TTCIZx5/DxjLqM
+Tt/RFBpVo9geZQrvLUqLAMwdaUvDXC2c6DaCPXTh65oCZj/hqzlJHH+RoTWWzKI+
+BjXxgUWF9EmZUBrg68DSmI+9wuDFsjZ51BcqvJwxyfxtTaWhdoYqH/UQS+D1FP3/
+diZHHlzwVwPICzM9ooNTgbrcDzyxRkIVqsVwBq7EtzcvgYUyX53yG25Giy6YQaQ2
+ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX
+=MuOY
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+PRIVATE_KEY = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+lQcYBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
+iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
+zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
+irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
+huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
+d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
+wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
+hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
+U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
+T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
+Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
+AA/+JHtlL39G1wsH9R6UEfUQJGXR9MiIiwZoKcnRB2o8+DS+OLjg0JOh8XehtuCs
+E/8oGQKtQqa5bEIstX7IZoYmYFiUQi9LOzIblmp2vxOm+HKkxa4JszWci2/ZmC3t
+KtaA4adl9XVnshoQ7pijuCMUKB3naBEOAxd8s9d/JeReGIYkJErdrnVfNk5N71Ds
+FmH5Ll3XtEDvgBUQP3nkA6QFjpsaB94FHjL3gDwum/cxzj6pCglcvHOzEhfY0Ddb
+J967FozQTaf2JW3O+w3LOqtcKWpq87B7+O61tVidQPSSuzPjCtFF0D2LC9R/Hpky
+KTMQ6CaKja4MPhjwywd4QPcHGYSqjMpflvJqi+kYIt8psUK/YswWjnr3r4fbuqVY
+VhtiHvnBHQjz135lUqWvEz4hM3Xpnxydx7aRlv5NlevK8+YIO5oFbWbGNTWsPZI5
+jpoFBpSsnR1Q5tnvtNHauvoWV+XN2qAOBTG+/nEbDYH6Ak3aaE9jrpTdYh0CotYF
+q7csANsDy3JvkAzeU6WnYpsHHaAjqOGyiZGsLej1UcXPFMosE/aUo4WQhiS8Zx2c
+zOVKOi/X5vQ2GdNT9Qolz8AriwzsvFR+bxPzyd8V6ALwDsoXvwEYinYBKK8j0OPv
+OOihSR6HVsuP9NUZNU9ewiGzte/+/r6pNXHvR7wTQ8EWLcEIAN6Zyrb0bHZTIlxt
+VWur/Ht2mIZrBaO50qmM5RD3T5oXzWXi/pjLrIpBMfeZR9DWfwQwjYzwqi7pxtYx
+nJvbMuY505rfnMoYxb4J+cpRXV8MS7Dr1vjjLVUC9KiwSbM3gg6emfd2yuA93ihv
+Pe3mffzLIiQa4mRE3wtGcioC43nWuV2K2e1KjxeFg07JhrezA/1Cak505ab/tmvP
+4YmjR5c44+yL/YcQ3HdFgs4mV+nVbptRXvRcPpolJsgxPccGNdvHhsoR4gwXMS3F
+RRPD2z6x8xeN73Q4KH3bm01swQdwFBZbWVfmUGLxvN7leCdfs9+iFJyqHiCIB6Iv
+mQfp8F0IAOwSo8JhWN+V1dwML4EkIrM8wUb4yecNLkyR6TpPH/qXx4PxVMC+vy6x
+sCtjeHIwKE+9vqnlhd5zOYh7qYXEJtYwdeDDmDbL8oks1LFfd+FyAuZXY33DLwn0
+cRYsr2OEZmaajqUB3NVmj3H4uJBN9+paFHyFSXrH68K1Fk2o3n+RSf2EiX+eICwI
+L6rqoF5sSVUghBWdNegV7qfy4anwTQwrIMGjgU5S6PKW0Dr/3iO5z3qQpGPAj5OW
+ATqPWkDICLbObPxD5cJlyyNE2wCA9VVc6/1d6w4EVwSq9h3/WTpATEreXXxTGptd
+LNiTA1nmakBYNO2Iyo3djhaqBdWjk+EIAKtVEnJH9FAVwWOvaj1RoZMA5DnDMo7e
+SnhrCXl8AL7Z1WInEaybasTJXn1uQ8xY52Ua4b8cbuEKRKzw/70NesFRoMLYoHTO
+dyeszvhoDHberpGRTciVmpMu7Hyi33rM31K9epA4ib6QbbCHnxkWOZB+Bhgj1hJ8
+xb4RBYWiWpAYcg0+DAC3w9gfxQhtUlZPIbmbrBmrVkO2GVGUj8kH6k4UV6kUHEGY
+HQWQR0HcbKcXW81ZXCCD0l7ROuEWQtTe5Jw7dJ4/QFuqZnPutXVRNOZqpl6eRShw
+7X2/a29VXBpmHA95a88rSQsL+qm7Fb3prqRmuMCtrUZgFz7HLSTuUMR867QcTGVh
+cCBUZXN0IEtleSA8bGVhcEBsZWFwLnNlPokCNwQTAQgAIQUCUL352QIbAwULCQgH
+AwUVCgkICwUWAgMBAAIeAQIXgAAKCRAvRV4oJNGN30+xEACh9yLkZ4jqW0/wwyIM
+MI896MQf1tAwzMj16MJYUjrjNK4Bn57QaQW926HsxF8C/OjT0MTRhq7heYZJnnEo
+rj0rzpkJapUveTRkKeoTRtGGigqJYfkOTU7KRVwgJBXIfaKlI3tC3cX0j0H1fVKX
+hLxsj5pNSPRCVf2A5mePg44HtXe6oVWSJ8+EcdTa0shf03NhAtFaY0BbFGPSm9mA
+QUe4rxugwXPLctIyV4uweFo5BXFBCb4kKTBdnQi3aJwnoWLNT6rDdTe4/nhY0Hfo
+alTCYGLkhio77gBHwpTOjEMO/hZhcDMi4CvxMPw7bRxAwq4u+0j0pDhkiLcQs4U4
+Ou/fH+pia+1nF5h19cNVXIm+RX2fL0wxVYc/14AIAK3YT6PVev9XYEkogSj0P7Kb
+HKOruYpnToXJBERNJZwGL1U+ihPNUyroRf29t7u8flnXsOpCtBEIWAO8Muy5pWjV
+3O6zAUCfWetAieCQ7WrQVmdJDa7dlX3Qx1XagUzqZdAq2jVI1hOWDA2rKytnReSF
+/A97rmLaWZ8aoNCs8i4NLcy9Lbzi9QtornYGVCEmTTym0tM9L/mn7gAJ8dqUwt7n
+s24dibfElky4ZZeItD+D7OZGeh0FDuejvv2dXFqL1/pkHpGBZhEckg0fZ95NbgMC
+4pSZkZnqRpr2GwfB5aFfB6sIIJ0HGARQvfnZARAAtCP8Z9bm6KzIA7wbXx9LBIcJ
+1wQvOPf99s4nOrnQev9xH5PZ820qS9xUjrlyE2bGYAhz5Cmq56ENs7THErIdZHtQ
+uYEBprO+VFZjP50vtmCOL2PDl/xgv6J9r1Mp3KnR/m0esR+YceDW1qX07IkB5s+Q
+us80v5LmmxnWcikWmR7dt1kOyV/+M6Y6mwvfQ4x3D/QUpO7SfMCOG5DGA7hVUHU/
+Tuh8MihmMFFOLAEEQI+1wkxr1W09HaYCcB+EZqxLSaBwMeFoYPJie9dBNBgps39o
+6pDbjsO+or4JNuyoHvh8NNI5iY0IR2NMlW4mqCcHEazys2koxFTYK6YD95Vz0RkA
+K4BErCDk3lVR0PH4StmLU3gmPayIjvi9Dl9saPRyu4Xx2WVi+q6spl3ckn4c4f3+
+iD8hxVp74+wa5ew0fIXjIpMoHCar/nndsse4i8glCddINdiOPPmhI9Wi3nT+5Z2t
+9omPP2dEh0CzR+j1zvUpT3KtmhVICqhO+QP9BTJOwrp81NTlq9mbUyzTtVk/9dy3
+zoYbhKvY08k6LJ9FsQYySqtfJZ4cwl5WsOhALWwOwlMLA9wkz0eemgFxStyOylzl
+QKoIK7zHuU6XYOXa32KSPIWaLy+WgIG/u2ObWtdE3CXVIUuSt5BQFnv7XVNHJllD
+Az9VDEkOSYOiSEFVoUsAEQEAAQAP/1AagnZQZyzHDEgw4QELAspYHCWLXE5aZInX
+wTUJhK31IgIXNn9bJ0hFiSpQR2xeMs9oYtRuPOu0P8oOFMn4/z374fkjZy8QVY3e
+PlL+3EUeqYtkMwlGNmVw5a/NbNuNfm5Darb7pEfbYd1gPcni4MAYw7R2SG/57GbC
+9gucvspHIfOSfBNLBthDzmK8xEKe1yD2eimfc2T7IRYb6hmkYfeds5GsqvGI6mwI
+85h4uUHWRc5JOlhVM6yX8hSWx0L60Z3DZLChmc8maWnFXd7C8eQ6P1azJJbW71Ih
+7CoK0XW4LE82vlQurSRFgTwfl7wFYszW2bOzCuhHDDtYnwH86Nsu0DC78ZVRnvxn
+E8Ke/AJgrdhIOo4UAyR+aZD2+2mKd7/waOUTUrUtTzc7i8N3YXGi/EIaNReBXaq+
+ZNOp24BlFzRp+FCF/pptDW9HjPdiV09x0DgICmeZS4Gq/4vFFIahWctg52NGebT0
+Idxngjj+xDtLaZlLQoOz0n5ByjO/Wi0ANmMv1sMKCHhGvdaSws2/PbMR2r4caj8m
+KXpIgdinM/wUzHJ5pZyF2U/qejsRj8Kw8KH/tfX4JCLhiaP/mgeTuWGDHeZQERAT
+xPmRFHaLP9/ZhvGNh6okIYtrKjWTLGoXvKLHcrKNisBLSq+P2WeFrlme1vjvJMo/
+jPwLT5o9CADQmcbKZ+QQ1ZM9v99iDZol7SAMZX43JC019sx6GK0u6xouJBcLfeB4
+OXacTgmSYdTa9RM9fbfVpti01tJ84LV2SyL/VJq/enJF4XQPSynT/tFTn1PAor6o
+tEAAd8fjKdJ6LnD5wb92SPHfQfXqI84rFEO8rUNIE/1ErT6DYifDzVCbfD2KZdoF
+cOSp7TpD77sY1bs74ocBX5ejKtd+aH99D78bJSMM4pSDZsIEwnomkBHTziubPwJb
+OwnATy0LmSMAWOw5rKbsh5nfwCiUTM20xp0t5JeXd+wPVWbpWqI2EnkCEN+RJr9i
+7dp/ymDQ+Yt5wrsN3NwoyiexPOG91WQVCADdErHsnglVZZq9Z8Wx7KwecGCUurJ2
+H6lKudv5YOxPnAzqZS5HbpZd/nRTMZh2rdXCr5m2YOuewyYjvM757AkmUpM09zJX
+MQ1S67/UX2y8/74TcRF97Ncx9HeELs92innBRXoFitnNguvcO6Esx4BTe1OdU6qR
+ER3zAmVf22Le9ciXbu24DN4mleOH+OmBx7X2PqJSYW9GAMTsRB081R6EWKH7romQ
+waxFrZ4DJzZ9ltyosEJn5F32StyLrFxpcrdLUoEaclZCv2qka7sZvi0EvovDVEBU
+e10jOx9AOwf8Gj2ufhquQ6qgVYCzbP+YrodtkFrXRS3IsljIchj1M2ffB/0bfoUs
+rtER9pLvYzCjBPg8IfGLw0o754Qbhh/ReplCRTusP/fQMybvCvfxreS3oyEriu/G
+GufRomjewZ8EMHDIgUsLcYo2UHZsfF7tcazgxMGmMvazp4r8vpgrvW/8fIN/6Adu
+tF+WjWDTvJLFJCe6O+BFJOWrssNrrra1zGtLC1s8s+Wfpe+bGPL5zpHeebGTwH1U
+22eqgJArlEKxrfarz7W5+uHZJHSjF/K9ZvunLGD0n9GOPMpji3UO3zeM8IYoWn7E
+/EWK1XbjnssNemeeTZ+sDh+qrD7BOi+vCX1IyBxbfqnQfJZvmcPWpruy1UsO+aIC
+0GY8Jr3OL69dDQ21jueJAh8EGAEIAAkFAlC9+dkCGwwACgkQL0VeKCTRjd9HCw/+
+LQSVgLLF4ulYlPCjWIIuQwrPbJfWUVVr2dPUFVM85DCv8gBzk5c121snXh9Swovm
+laBbw6ate3BmbXLh64jVE9Za5sbTWi7PCcbO/bpRy4d6oLmitmNw6cq0vjTLxUYy
+bwuiJxWREkfxuU85EKdouN062YDevH+/YResmlJrcCE7LRlJFeRlKsrrwBU3BqYd
+GgFJjKjQC1peeQ9fj62Y7xfwE9+PXbkiWO5u/Bk8hb1VZH1SoIRU98NHVcp6BVvp
+VK0jLAXuSauSczULmpRjbyt1lhaAqivDTWEEZXiNNbRyp17c3nVdPWOcgBr42hdQ
+z25CgZgyLCsvu82wuXLKJblrIPJX3Yf+si6KqEWBsmwdOWybsjygaF5HvzgFqAAD
+U0goPWoQ71PorP2XOUNp5ZLkBQp5etvtkksjVNMIhnHn8PGMuoxO39EUGlWj2B5l
+Cu8tSosAzB1pS8NcLZzoNoI9dOHrmgJmP+GrOUkcf5GhNZbMoj4GNfGBRYX0SZlQ
+GuDrwNKYj73C4MWyNnnUFyq8nDHJ/G1NpaF2hiof9RBL4PUU/f92JkceXPBXA8gL
+Mz2ig1OButwPPLFGQhWqxXAGrsS3Ny+BhTJfnfIbbkaLLphBpDZm1D9XKbAUvdd1
+RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
+=JTFu
+-----END PGP PRIVATE KEY BLOCK-----
+"""
+
+# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>"
+PUBLIC_KEY_2 = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR
+gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq
+Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0
+IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle
+AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E
+gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw
+ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4
+JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz
+VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt
+Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63
+yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ
+f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X
+Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck
+I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ=
+=Thdu
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+
+PRIVATE_KEY_2 = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD
+kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1
+6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB
+AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8
+H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks
+7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X
+C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje
+uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty
+GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI
+1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v
+dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG
+CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh
+8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD
+izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT
+oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL
+juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw
+cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe
+94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC
+rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx
+77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2
+3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF
+UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO
+2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB
+/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE
+JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda
+z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk
+o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6
+THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0
+=a5gs
+-----END PGP PRIVATE KEY BLOCK-----
+"""
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index e2558e4..a12cac0 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -1,4 +1,4 @@
-## -*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-
# test_keymanager.py
# Copyright (C) 2013 LEAP
#
@@ -21,34 +21,38 @@ Tests for the Key Manager.
"""
+from datetime import datetime
from mock import Mock
-from leap.common.testing.basetest import BaseLeapTest
-from leap.soledad.client import Soledad
+from twisted.internet.defer import inlineCallbacks
+from twisted.trial import unittest
+
from leap.keymanager import (
- KeyManager,
- openpgp,
KeyNotFound,
- errors,
+ KeyAddressMismatch,
+ errors
)
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.keys import (
is_address,
build_key_from_dict,
)
+from leap.keymanager.validation import ValidationLevels
+from leap.keymanager.tests import (
+ KeyManagerWithSoledadTestCase,
+ ADDRESS,
+ ADDRESS_2,
+ KEY_FINGERPRINT,
+ PUBLIC_KEY,
+ PUBLIC_KEY_2,
+ PRIVATE_KEY,
+ PRIVATE_KEY_2,
+)
-ADDRESS = 'leap@leap.se'
-ADDRESS_2 = 'anotheruser@leap.se'
-GPG_BINARY_PATH = '/usr/bin/gpg'
-
-
-class KeyManagerUtilTestCase(BaseLeapTest):
+NICKSERVER_URI = "http://leap.se/"
- def setUp(self):
- pass
- def tearDown(self):
- pass
+class KeyManagerUtilTestCase(unittest.TestCase):
def test_is_address(self):
self.assertTrue(
@@ -66,18 +70,20 @@ class KeyManagerUtilTestCase(BaseLeapTest):
def test_build_key_from_dict(self):
kdict = {
- 'address': ADDRESS,
- 'key_id': 'key_id',
- 'fingerprint': 'fingerprint',
- 'key_data': 'key_data',
- 'private': 'private',
- 'length': 'length',
- 'expiry_date': 'expiry_date',
- 'first_seen_at': 'first_seen_at',
- 'last_audited_at': 'last_audited_at',
- 'validation': 'validation',
+ 'address': [ADDRESS],
+ 'key_id': KEY_FINGERPRINT[-16:],
+ 'fingerprint': KEY_FINGERPRINT,
+ 'key_data': PUBLIC_KEY,
+ 'private': False,
+ 'length': 4096,
+ 'expiry_date': 0,
+ 'last_audited_at': 0,
+ 'refreshed_at': 1311239602,
+ 'validation': str(ValidationLevels.Weak_Chain),
+ 'encr_used': False,
+ 'sign_used': True,
}
- key = build_key_from_dict(OpenPGPKey, ADDRESS, kdict)
+ key = build_key_from_dict(OpenPGPKey, kdict)
self.assertEqual(
kdict['address'], key.address,
'Wrong data in key.')
@@ -97,298 +103,81 @@ class KeyManagerUtilTestCase(BaseLeapTest):
kdict['length'], key.length,
'Wrong data in key.')
self.assertEqual(
- kdict['expiry_date'], key.expiry_date,
+ None, key.expiry_date,
'Wrong data in key.')
self.assertEqual(
- kdict['first_seen_at'], key.first_seen_at,
+ None, key.last_audited_at,
'Wrong data in key.')
self.assertEqual(
- kdict['last_audited_at'], key.last_audited_at,
+ datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
- kdict['validation'], key.validation,
+ ValidationLevels.get(kdict['validation']), key.validation,
'Wrong data in key.')
-
-
-class KeyManagerWithSoledadTestCase(BaseLeapTest):
-
- def setUp(self):
- # mock key fetching and storing so Soledad doesn't fail when trying to
- # reach the server.
- Soledad._get_secrets_from_shared_db = Mock(return_value=None)
- Soledad._put_secrets_in_shared_db = Mock(return_value=None)
-
- class MockSharedDB(object):
-
- get_doc = Mock(return_value=None)
- put_doc = Mock()
- lock = Mock(return_value=('atoken', 300))
- unlock = Mock(return_value=True)
-
- def __call__(self):
- return self
-
- Soledad._shared_db = MockSharedDB()
-
- self._soledad = Soledad(
- u"leap@leap.se",
- u"123456",
- secrets_path=self.tempdir + "/secret.gpg",
- local_db_path=self.tempdir + "/soledad.u1db",
- server_url='',
- cert_file=None,
- auth_token=None,
- )
-
- def tearDown(self):
- km = self._key_manager()
- for key in km.get_all_keys_in_local_db():
- km._wrapper_map[key.__class__].delete_key(key)
- for key in km.get_all_keys_in_local_db(private=True):
- km._wrapper_map[key.__class__].delete_key(key)
-
- def _key_manager(self, user=ADDRESS, url=''):
- return KeyManager(user, url, self._soledad,
- gpgbinary=GPG_BINARY_PATH)
-
-
-class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
-
- def _test_openpgp_gen_key(self):
- pgp = openpgp.OpenPGPScheme(self._soledad)
- self.assertRaises(KeyNotFound, pgp.get_key, 'user@leap.se')
- key = pgp.gen_key('user@leap.se')
- self.assertIsInstance(key, openpgp.OpenPGPKey)
- self.assertEqual(
- 'user@leap.se', key.address, 'Wrong address bound to key.')
- self.assertEqual(
- '4096', key.length, 'Wrong key length.')
-
- def test_openpgp_put_delete_key(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
- key = pgp.get_key(ADDRESS, private=False)
- pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-
- def test_openpgp_put_ascii_key(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
- key = pgp.get_key(ADDRESS, private=False)
- self.assertIsInstance(key, openpgp.OpenPGPKey)
self.assertEqual(
- ADDRESS, key.address, 'Wrong address bound to key.')
+ kdict['encr_used'], key.encr_used,
+ 'Wrong data in key.')
self.assertEqual(
- '4096', key.length, 'Wrong key length.')
- pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-
- def test_get_public_key(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
- pgp.put_ascii_key(PUBLIC_KEY)
- self.assertRaises(
- KeyNotFound, pgp.get_key, ADDRESS, private=True)
- key = pgp.get_key(ADDRESS, private=False)
- self.assertEqual(ADDRESS, key.address)
- self.assertFalse(key.private)
- self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
- pgp.delete_key(key)
- self.assertRaises(KeyNotFound, pgp.get_key, ADDRESS)
-
- def test_openpgp_encrypt_decrypt(self):
- # encrypt
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
- pubkey = pgp.get_key(ADDRESS, private=False)
- cyphertext = pgp.encrypt('data', pubkey)
- # assert
- self.assertTrue(cyphertext is not None)
- self.assertTrue(cyphertext != '')
- self.assertTrue(cyphertext != 'data')
- self.assertTrue(pgp.is_encrypted(cyphertext))
- self.assertTrue(pgp.is_encrypted(cyphertext))
- # decrypt
- self.assertRaises(
- KeyNotFound, pgp.get_key, ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY)
- privkey = pgp.get_key(ADDRESS, private=True)
- plaintext = pgp.decrypt(cyphertext, privkey)
- pgp.delete_key(pubkey)
- pgp.delete_key(privkey)
- self.assertRaises(
- KeyNotFound, pgp.get_key, ADDRESS, private=False)
- self.assertRaises(
- KeyNotFound, pgp.get_key, ADDRESS, private=True)
-
- def test_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- signed = pgp.sign(data, privkey)
- self.assertRaises(
- AssertionError,
- pgp.verify, signed, privkey)
-
- def test_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PUBLIC_KEY)
- data = 'data'
- pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertRaises(
- AssertionError,
- pgp.sign, data, pubkey)
-
- def test_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- signed = pgp.sign(data, privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
- wrongkey = pgp.get_key(ADDRESS_2)
- self.assertRaises(
- errors.InvalidSignature,
- pgp.verify, signed, wrongkey)
-
- def test_encrypt_sign_with_public_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertRaises(
- AssertionError,
- pgp.encrypt, data, privkey, sign=pubkey)
-
- def test_decrypt_verify_with_private_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = pgp.encrypt(
- data, pubkey, sign=privkey)
- self.assertRaises(
- AssertionError,
- pgp.decrypt,
- encrypted_and_signed, privkey, verify=privkey)
-
- def test_decrypt_verify_with_wrong_key_raises(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- pubkey = pgp.get_key(ADDRESS, private=False)
- encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
- pgp.put_ascii_key(PUBLIC_KEY_2)
- wrongkey = pgp.get_key(ADDRESS_2)
- self.assertRaises(
- errors.InvalidSignature,
- pgp.verify, encrypted_and_signed, wrongkey)
-
- def test_sign_verify(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- signed = pgp.sign(data, privkey, detach=False)
- pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertTrue(pgp.verify(signed, pubkey))
-
- def test_encrypt_sign_decrypt_verify(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- pubkey = pgp.get_key(ADDRESS, private=False)
- privkey = pgp.get_key(ADDRESS, private=True)
- pgp.put_ascii_key(PRIVATE_KEY_2)
- pubkey2 = pgp.get_key(ADDRESS_2, private=False)
- privkey2 = pgp.get_key(ADDRESS_2, private=True)
- data = 'data'
- encrypted_and_signed = pgp.encrypt(
- data, pubkey2, sign=privkey)
- res = pgp.decrypt(
- encrypted_and_signed, privkey2, verify=pubkey)
- self.assertTrue(data, res)
-
- def test_sign_verify_detached_sig(self):
- pgp = openpgp.OpenPGPScheme(
- self._soledad, gpgbinary=GPG_BINARY_PATH)
- pgp.put_ascii_key(PRIVATE_KEY)
- data = 'data'
- privkey = pgp.get_key(ADDRESS, private=True)
- signature = pgp.sign(data, privkey, detach=True)
- pubkey = pgp.get_key(ADDRESS, private=False)
- self.assertTrue(pgp.verify(data, pubkey, detached_sig=signature))
+ kdict['sign_used'], key.sign_used,
+ 'Wrong data in key.')
class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
+ @inlineCallbacks
def test_get_all_keys_in_db(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get public keys
- keys = km.get_all_keys_in_local_db(False)
+ keys = yield km.get_all_keys(False)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertFalse(keys[0].private)
# get private keys
- keys = km.get_all_keys_in_local_db(True)
+ keys = yield km.get_all_keys(True)
self.assertEqual(len(keys), 1, 'Wrong number of keys')
- self.assertEqual(ADDRESS, keys[0].address)
+ self.assertTrue(ADDRESS in keys[0].address)
self.assertTrue(keys[0].private)
+ @inlineCallbacks
def test_get_public_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
- key = km.get_key(ADDRESS, OpenPGPKey, private=False,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
+ fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertFalse(key.private)
+ @inlineCallbacks
def test_get_private_key(self):
km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
# get the key
- key = km.get_key(ADDRESS, OpenPGPKey, private=True,
- fetch_remote=False)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, private=True,
+ fetch_remote=False)
self.assertTrue(key is not None)
- self.assertEqual(key.address, ADDRESS)
+ self.assertTrue(ADDRESS in key.address)
self.assertEqual(
key.fingerprint.lower(), KEY_FINGERPRINT.lower())
self.assertTrue(key.private)
def test_send_key_raises_key_not_found(self):
km = self._key_manager()
- self.assertRaises(
- KeyNotFound,
- km.send_key, OpenPGPKey)
+ d = km.send_key(OpenPGPKey)
+ return self.assertFailure(d, KeyNotFound)
+ @inlineCallbacks
def test_send_key(self):
"""
Test that request is well formed when sending keys to server.
"""
- km = self._key_manager()
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
+ token = "mytoken"
+ km = self._key_manager(token=token)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY, ADDRESS)
km._fetcher.put = Mock()
# the following data will be used on the send
km.ca_cert_path = 'capath'
@@ -396,354 +185,245 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
km.uid = 'myuid'
km.api_uri = 'apiuri'
km.api_version = 'apiver'
- km.send_key(OpenPGPKey)
+ yield km.send_key(OpenPGPKey)
# setup expected args
+ pubkey = yield km.get_key(km._address, OpenPGPKey)
data = {
- km.PUBKEY_KEY: km.get_key(km._address, OpenPGPKey).key_data,
+ km.PUBKEY_KEY: pubkey.key_data,
}
url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
km._fetcher.put.assert_called_once_with(
url, data=data, verify='capath',
- cookies={'_session_id': 'sessionid'},
+ headers={'Authorization': 'Token token=%s' % token},
)
- def test__fetch_keys_from_server(self):
+ def test_fetch_keys_from_server(self):
"""
Test that the request is well formed when fetching keys from server.
"""
- km = self._key_manager(url='http://nickserver.domain')
+ km = self._key_manager(url=NICKSERVER_URI)
+
+ def verify_the_call(_):
+ km._fetcher.get.assert_called_once_with(
+ NICKSERVER_URI,
+ data={'address': ADDRESS_2},
+ verify='cacertpath',
+ )
+
+ d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
+ d.addCallback(verify_the_call)
+ return d
+
+ @inlineCallbacks
+ def test_get_key_fetches_from_server(self):
+ """
+ Test that getting a key successfuly fetches from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+
+ key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.address)
+ self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
+
+ @inlineCallbacks
+ def test_get_key_fetches_other_domain(self):
+ """
+ Test that getting a key successfuly fetches from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS_OTHER in key.address)
+ self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
+
+ def _fetch_key(self, km, address, key):
+ """
+ :returns: a Deferred that will fire with the OpenPGPKey
+ """
class Response(object):
status_code = 200
headers = {'content-type': 'application/json'}
def json(self):
- return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
+ return {'address': address, 'openpgp': key}
def raise_for_status(self):
pass
# mock the fetcher so it returns the key for ADDRESS_2
- km._fetcher.get = Mock(
- return_value=Response())
+ km._fetcher.get = Mock(return_value=Response())
km.ca_cert_path = 'cacertpath'
- # do the fetch
- km._fetch_keys_from_server(ADDRESS_2)
- # and verify the call
- km._fetcher.get.assert_called_once_with(
- 'http://nickserver.domain',
- data={'address': ADDRESS_2},
- verify='cacertpath',
- )
+ # try to key get without fetching from server
+ d_fail = km.get_key(address, OpenPGPKey, fetch_remote=False)
+ d = self.assertFailure(d_fail, KeyNotFound)
+ # try to get key fetching from server.
+ d.addCallback(lambda _: km.get_key(address, OpenPGPKey))
+ return d
- def test_refresh_keys_does_not_refresh_own_key(self):
+ @inlineCallbacks
+ def test_put_key_ascii(self):
"""
- Test that refreshing keys will not attempt to refresh our own key.
+ Test that putting ascii key works
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+
+ yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ key = yield km.get_key(ADDRESS, OpenPGPKey)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.address)
+
+ @inlineCallbacks
+ def test_fetch_uri_ascii_key(self):
+ """
+ Test that fetch key downloads the ascii key and gets included in
+ the local storage
"""
km = self._key_manager()
- # we add 2 keys but we expect it to only refresh the second one.
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY)
- km._wrapper_map[OpenPGPKey].put_ascii_key(PUBLIC_KEY_2)
- # mock the key fetching
- km._fetch_keys_from_server = Mock(return_value=[])
- km.ca_cert_path = '' # some bogus path so the km does not complain.
- # do the refreshing
- km.refresh_keys()
- km._fetch_keys_from_server.assert_called_once_with(
- ADDRESS_2
- )
- def test_get_key_fetches_from_server(self):
+ class Response(object):
+ ok = True
+ content = PUBLIC_KEY
+
+ km._fetcher.get = Mock(return_value=Response())
+ km.ca_cert_path = 'cacertpath'
+
+ yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
+ key = yield km.get_key(ADDRESS, OpenPGPKey)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+
+ def test_fetch_uri_empty_key(self):
"""
- Test that getting a key successfuly fetches from server.
+ Test that fetch key raises KeyNotFound if no key in the url
"""
- km = self._key_manager(url='http://nickserver.domain')
+ km = self._key_manager()
class Response(object):
- status_code = 200
- headers = {'content-type': 'application/json'}
+ ok = True
+ content = ""
- def json(self):
- return {'address': ADDRESS_2, 'openpgp': PUBLIC_KEY_2}
+ km._fetcher.get = Mock(return_value=Response())
+ km.ca_cert_path = 'cacertpath'
+ d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
+ return self.assertFailure(d, KeyNotFound)
- def raise_for_status(self):
- pass
+ def test_fetch_uri_address_differ(self):
+ """
+ Test that fetch key raises KeyAttributesDiffer if the address
+ don't match
+ """
+ km = self._key_manager()
+
+ class Response(object):
+ ok = True
+ content = PUBLIC_KEY
- # mock the fetcher so it returns the key for ADDRESS_2
km._fetcher.get = Mock(return_value=Response())
km.ca_cert_path = 'cacertpath'
- # try to key get without fetching from server
- self.assertRaises(
- KeyNotFound, km.get_key, ADDRESS_2, OpenPGPKey,
- fetch_remote=False
- )
- # try to get key fetching from server.
- key = km.get_key(ADDRESS_2, OpenPGPKey)
- self.assertIsInstance(key, OpenPGPKey)
- self.assertEqual(ADDRESS_2, key.address)
+ d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
+ return self.assertFailure(d, KeyAddressMismatch)
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
RAW_DATA = 'data'
+ @inlineCallbacks
def test_keymanager_openpgp_encrypt_decrypt(self):
km = self._key_manager()
# put raw private key
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
- # get public key
- pubkey = km.get_key(
- ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
# encrypt
- encdata = km.encrypt(self.RAW_DATA, pubkey)
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
self.assertNotEqual(self.RAW_DATA, encdata)
- # get private key
- privkey = km.get_key(
- ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
# decrypt
- rawdata = km.decrypt(encdata, privkey)
+ rawdata, signingkey = yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
self.assertEqual(self.RAW_DATA, rawdata)
+ key = yield km.get_key(ADDRESS_2, OpenPGPKey, private=False,
+ fetch_remote=False)
+ self.assertEqual(signingkey.fingerprint, key.fingerprint)
+ @inlineCallbacks
+ def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
+ km = self._key_manager()
+ # put raw keys
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
+ # encrypt
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
+ self.assertNotEqual(self.RAW_DATA, encdata)
+ # verify
+ rawdata, signingkey = yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS, fetch_remote=False)
+ self.assertEqual(self.RAW_DATA, rawdata)
+ self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
+
+ @inlineCallbacks
def test_keymanager_openpgp_sign_verify(self):
km = self._key_manager()
# put raw private keys
- km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY)
- # get private key for signing
- privkey = km.get_key(
- ADDRESS, OpenPGPKey, private=True, fetch_remote=False)
- # encrypt
- signdata = km.sign(self.RAW_DATA, privkey, detach=False)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ signdata = yield km.sign(self.RAW_DATA, ADDRESS, OpenPGPKey,
+ detach=False)
self.assertNotEqual(self.RAW_DATA, signdata)
- # get public key for verifying
- pubkey = km.get_key(
- ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
- # decrypt
- self.assertTrue(km.verify(signdata, pubkey))
-
-
-# Key material for testing
+ # verify
+ signingkey = yield km.verify(signdata, ADDRESS, OpenPGPKey,
+ fetch_remote=False)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, private=False,
+ fetch_remote=False)
+ self.assertEqual(signingkey.fingerprint, key.fingerprint)
+
+ def test_keymanager_encrypt_key_not_found(self):
+ km = self._key_manager()
+ d = km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ d.addCallback(
+ lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, OpenPGPKey,
+ sign=ADDRESS, fetch_remote=False))
+ return self.assertFailure(d, KeyNotFound)
-# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
-KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
-PUBLIC_KEY = """
------BEGIN PGP PUBLIC KEY BLOCK-----
-Version: GnuPG v1.4.10 (GNU/Linux)
-
-mQINBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
-iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
-zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
-irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
-huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
-d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
-wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
-hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
-U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
-T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
-Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
-tBxMZWFwIFRlc3QgS2V5IDxsZWFwQGxlYXAuc2U+iQI3BBMBCAAhBQJQvfnZAhsD
-BQsJCAcDBRUKCQgLBRYCAwEAAh4BAheAAAoJEC9FXigk0Y3fT7EQAKH3IuRniOpb
-T/DDIgwwjz3oxB/W0DDMyPXowlhSOuM0rgGfntBpBb3boezEXwL86NPQxNGGruF5
-hkmecSiuPSvOmQlqlS95NGQp6hNG0YaKColh+Q5NTspFXCAkFch9oqUje0LdxfSP
-QfV9UpeEvGyPmk1I9EJV/YDmZ4+Djge1d7qhVZInz4Rx1NrSyF/Tc2EC0VpjQFsU
-Y9Kb2YBBR7ivG6DBc8ty0jJXi7B4WjkFcUEJviQpMF2dCLdonCehYs1PqsN1N7j+
-eFjQd+hqVMJgYuSGKjvuAEfClM6MQw7+FmFwMyLgK/Ew/DttHEDCri77SPSkOGSI
-txCzhTg6798f6mJr7WcXmHX1w1Vcib5FfZ8vTDFVhz/XgAgArdhPo9V6/1dgSSiB
-KPQ/spsco6u5imdOhckERE0lnAYvVT6KE81TKuhF/b23u7x+Wdew6kK0EQhYA7wy
-7LmlaNXc7rMBQJ9Z60CJ4JDtatBWZ0kNrt2VfdDHVdqBTOpl0CraNUjWE5YMDasr
-K2dF5IX8D3uuYtpZnxqg0KzyLg0tzL0tvOL1C2iudgZUISZNPKbS0z0v+afuAAnx
-2pTC3uezbh2Jt8SWTLhll4i0P4Ps5kZ6HQUO56O+/Z1cWovX+mQekYFmERySDR9n
-3k1uAwLilJmRmepGmvYbB8HloV8HqwgguQINBFC9+dkBEAC0I/xn1uborMgDvBtf
-H0sEhwnXBC849/32zic6udB6/3Efk9nzbSpL3FSOuXITZsZgCHPkKarnoQ2ztMcS
-sh1ke1C5gQGms75UVmM/nS+2YI4vY8OX/GC/on2vUyncqdH+bR6xH5hx4NbWpfTs
-iQHmz5C6zzS/kuabGdZyKRaZHt23WQ7JX/4zpjqbC99DjHcP9BSk7tJ8wI4bkMYD
-uFVQdT9O6HwyKGYwUU4sAQRAj7XCTGvVbT0dpgJwH4RmrEtJoHAx4Whg8mJ710E0
-GCmzf2jqkNuOw76ivgk27Kge+Hw00jmJjQhHY0yVbiaoJwcRrPKzaSjEVNgrpgP3
-lXPRGQArgESsIOTeVVHQ8fhK2YtTeCY9rIiO+L0OX2xo9HK7hfHZZWL6rqymXdyS
-fhzh/f6IPyHFWnvj7Brl7DR8heMikygcJqv+ed2yx7iLyCUJ10g12I48+aEj1aLe
-dP7lna32iY8/Z0SHQLNH6PXO9SlPcq2aFUgKqE75A/0FMk7CunzU1OWr2ZtTLNO1
-WT/13LfOhhuEq9jTyTosn0WxBjJKq18lnhzCXlaw6EAtbA7CUwsD3CTPR56aAXFK
-3I7KXOVAqggrvMe5Tpdg5drfYpI8hZovL5aAgb+7Y5ta10TcJdUhS5K3kFAWe/td
-U0cmWUMDP1UMSQ5Jg6JIQVWhSwARAQABiQIfBBgBCAAJBQJQvfnZAhsMAAoJEC9F
-Xigk0Y3fRwsP/i0ElYCyxeLpWJTwo1iCLkMKz2yX1lFVa9nT1BVTPOQwr/IAc5OX
-NdtbJ14fUsKL5pWgW8OmrXtwZm1y4euI1RPWWubG01ouzwnGzv26UcuHeqC5orZj
-cOnKtL40y8VGMm8LoicVkRJH8blPORCnaLjdOtmA3rx/v2EXrJpSa3AhOy0ZSRXk
-ZSrK68AVNwamHRoBSYyo0AtaXnkPX4+tmO8X8BPfj125IljubvwZPIW9VWR9UqCE
-VPfDR1XKegVb6VStIywF7kmrknM1C5qUY28rdZYWgKorw01hBGV4jTW0cqde3N51
-XT1jnIAa+NoXUM9uQoGYMiwrL7vNsLlyyiW5ayDyV92H/rIuiqhFgbJsHTlsm7I8
-oGheR784BagAA1NIKD1qEO9T6Kz9lzlDaeWS5AUKeXrb7ZJLI1TTCIZx5/DxjLqM
-Tt/RFBpVo9geZQrvLUqLAMwdaUvDXC2c6DaCPXTh65oCZj/hqzlJHH+RoTWWzKI+
-BjXxgUWF9EmZUBrg68DSmI+9wuDFsjZ51BcqvJwxyfxtTaWhdoYqH/UQS+D1FP3/
-diZHHlzwVwPICzM9ooNTgbrcDzyxRkIVqsVwBq7EtzcvgYUyX53yG25Giy6YQaQ2
-ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX
-=MuOY
------END PGP PUBLIC KEY BLOCK-----
-"""
-PRIVATE_KEY = """
------BEGIN PGP PRIVATE KEY BLOCK-----
-Version: GnuPG v1.4.10 (GNU/Linux)
-
-lQcYBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
-iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
-zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
-irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
-huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
-d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
-wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
-hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
-U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
-T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
-Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
-AA/+JHtlL39G1wsH9R6UEfUQJGXR9MiIiwZoKcnRB2o8+DS+OLjg0JOh8XehtuCs
-E/8oGQKtQqa5bEIstX7IZoYmYFiUQi9LOzIblmp2vxOm+HKkxa4JszWci2/ZmC3t
-KtaA4adl9XVnshoQ7pijuCMUKB3naBEOAxd8s9d/JeReGIYkJErdrnVfNk5N71Ds
-FmH5Ll3XtEDvgBUQP3nkA6QFjpsaB94FHjL3gDwum/cxzj6pCglcvHOzEhfY0Ddb
-J967FozQTaf2JW3O+w3LOqtcKWpq87B7+O61tVidQPSSuzPjCtFF0D2LC9R/Hpky
-KTMQ6CaKja4MPhjwywd4QPcHGYSqjMpflvJqi+kYIt8psUK/YswWjnr3r4fbuqVY
-VhtiHvnBHQjz135lUqWvEz4hM3Xpnxydx7aRlv5NlevK8+YIO5oFbWbGNTWsPZI5
-jpoFBpSsnR1Q5tnvtNHauvoWV+XN2qAOBTG+/nEbDYH6Ak3aaE9jrpTdYh0CotYF
-q7csANsDy3JvkAzeU6WnYpsHHaAjqOGyiZGsLej1UcXPFMosE/aUo4WQhiS8Zx2c
-zOVKOi/X5vQ2GdNT9Qolz8AriwzsvFR+bxPzyd8V6ALwDsoXvwEYinYBKK8j0OPv
-OOihSR6HVsuP9NUZNU9ewiGzte/+/r6pNXHvR7wTQ8EWLcEIAN6Zyrb0bHZTIlxt
-VWur/Ht2mIZrBaO50qmM5RD3T5oXzWXi/pjLrIpBMfeZR9DWfwQwjYzwqi7pxtYx
-nJvbMuY505rfnMoYxb4J+cpRXV8MS7Dr1vjjLVUC9KiwSbM3gg6emfd2yuA93ihv
-Pe3mffzLIiQa4mRE3wtGcioC43nWuV2K2e1KjxeFg07JhrezA/1Cak505ab/tmvP
-4YmjR5c44+yL/YcQ3HdFgs4mV+nVbptRXvRcPpolJsgxPccGNdvHhsoR4gwXMS3F
-RRPD2z6x8xeN73Q4KH3bm01swQdwFBZbWVfmUGLxvN7leCdfs9+iFJyqHiCIB6Iv
-mQfp8F0IAOwSo8JhWN+V1dwML4EkIrM8wUb4yecNLkyR6TpPH/qXx4PxVMC+vy6x
-sCtjeHIwKE+9vqnlhd5zOYh7qYXEJtYwdeDDmDbL8oks1LFfd+FyAuZXY33DLwn0
-cRYsr2OEZmaajqUB3NVmj3H4uJBN9+paFHyFSXrH68K1Fk2o3n+RSf2EiX+eICwI
-L6rqoF5sSVUghBWdNegV7qfy4anwTQwrIMGjgU5S6PKW0Dr/3iO5z3qQpGPAj5OW
-ATqPWkDICLbObPxD5cJlyyNE2wCA9VVc6/1d6w4EVwSq9h3/WTpATEreXXxTGptd
-LNiTA1nmakBYNO2Iyo3djhaqBdWjk+EIAKtVEnJH9FAVwWOvaj1RoZMA5DnDMo7e
-SnhrCXl8AL7Z1WInEaybasTJXn1uQ8xY52Ua4b8cbuEKRKzw/70NesFRoMLYoHTO
-dyeszvhoDHberpGRTciVmpMu7Hyi33rM31K9epA4ib6QbbCHnxkWOZB+Bhgj1hJ8
-xb4RBYWiWpAYcg0+DAC3w9gfxQhtUlZPIbmbrBmrVkO2GVGUj8kH6k4UV6kUHEGY
-HQWQR0HcbKcXW81ZXCCD0l7ROuEWQtTe5Jw7dJ4/QFuqZnPutXVRNOZqpl6eRShw
-7X2/a29VXBpmHA95a88rSQsL+qm7Fb3prqRmuMCtrUZgFz7HLSTuUMR867QcTGVh
-cCBUZXN0IEtleSA8bGVhcEBsZWFwLnNlPokCNwQTAQgAIQUCUL352QIbAwULCQgH
-AwUVCgkICwUWAgMBAAIeAQIXgAAKCRAvRV4oJNGN30+xEACh9yLkZ4jqW0/wwyIM
-MI896MQf1tAwzMj16MJYUjrjNK4Bn57QaQW926HsxF8C/OjT0MTRhq7heYZJnnEo
-rj0rzpkJapUveTRkKeoTRtGGigqJYfkOTU7KRVwgJBXIfaKlI3tC3cX0j0H1fVKX
-hLxsj5pNSPRCVf2A5mePg44HtXe6oVWSJ8+EcdTa0shf03NhAtFaY0BbFGPSm9mA
-QUe4rxugwXPLctIyV4uweFo5BXFBCb4kKTBdnQi3aJwnoWLNT6rDdTe4/nhY0Hfo
-alTCYGLkhio77gBHwpTOjEMO/hZhcDMi4CvxMPw7bRxAwq4u+0j0pDhkiLcQs4U4
-Ou/fH+pia+1nF5h19cNVXIm+RX2fL0wxVYc/14AIAK3YT6PVev9XYEkogSj0P7Kb
-HKOruYpnToXJBERNJZwGL1U+ihPNUyroRf29t7u8flnXsOpCtBEIWAO8Muy5pWjV
-3O6zAUCfWetAieCQ7WrQVmdJDa7dlX3Qx1XagUzqZdAq2jVI1hOWDA2rKytnReSF
-/A97rmLaWZ8aoNCs8i4NLcy9Lbzi9QtornYGVCEmTTym0tM9L/mn7gAJ8dqUwt7n
-s24dibfElky4ZZeItD+D7OZGeh0FDuejvv2dXFqL1/pkHpGBZhEckg0fZ95NbgMC
-4pSZkZnqRpr2GwfB5aFfB6sIIJ0HGARQvfnZARAAtCP8Z9bm6KzIA7wbXx9LBIcJ
-1wQvOPf99s4nOrnQev9xH5PZ820qS9xUjrlyE2bGYAhz5Cmq56ENs7THErIdZHtQ
-uYEBprO+VFZjP50vtmCOL2PDl/xgv6J9r1Mp3KnR/m0esR+YceDW1qX07IkB5s+Q
-us80v5LmmxnWcikWmR7dt1kOyV/+M6Y6mwvfQ4x3D/QUpO7SfMCOG5DGA7hVUHU/
-Tuh8MihmMFFOLAEEQI+1wkxr1W09HaYCcB+EZqxLSaBwMeFoYPJie9dBNBgps39o
-6pDbjsO+or4JNuyoHvh8NNI5iY0IR2NMlW4mqCcHEazys2koxFTYK6YD95Vz0RkA
-K4BErCDk3lVR0PH4StmLU3gmPayIjvi9Dl9saPRyu4Xx2WVi+q6spl3ckn4c4f3+
-iD8hxVp74+wa5ew0fIXjIpMoHCar/nndsse4i8glCddINdiOPPmhI9Wi3nT+5Z2t
-9omPP2dEh0CzR+j1zvUpT3KtmhVICqhO+QP9BTJOwrp81NTlq9mbUyzTtVk/9dy3
-zoYbhKvY08k6LJ9FsQYySqtfJZ4cwl5WsOhALWwOwlMLA9wkz0eemgFxStyOylzl
-QKoIK7zHuU6XYOXa32KSPIWaLy+WgIG/u2ObWtdE3CXVIUuSt5BQFnv7XVNHJllD
-Az9VDEkOSYOiSEFVoUsAEQEAAQAP/1AagnZQZyzHDEgw4QELAspYHCWLXE5aZInX
-wTUJhK31IgIXNn9bJ0hFiSpQR2xeMs9oYtRuPOu0P8oOFMn4/z374fkjZy8QVY3e
-PlL+3EUeqYtkMwlGNmVw5a/NbNuNfm5Darb7pEfbYd1gPcni4MAYw7R2SG/57GbC
-9gucvspHIfOSfBNLBthDzmK8xEKe1yD2eimfc2T7IRYb6hmkYfeds5GsqvGI6mwI
-85h4uUHWRc5JOlhVM6yX8hSWx0L60Z3DZLChmc8maWnFXd7C8eQ6P1azJJbW71Ih
-7CoK0XW4LE82vlQurSRFgTwfl7wFYszW2bOzCuhHDDtYnwH86Nsu0DC78ZVRnvxn
-E8Ke/AJgrdhIOo4UAyR+aZD2+2mKd7/waOUTUrUtTzc7i8N3YXGi/EIaNReBXaq+
-ZNOp24BlFzRp+FCF/pptDW9HjPdiV09x0DgICmeZS4Gq/4vFFIahWctg52NGebT0
-Idxngjj+xDtLaZlLQoOz0n5ByjO/Wi0ANmMv1sMKCHhGvdaSws2/PbMR2r4caj8m
-KXpIgdinM/wUzHJ5pZyF2U/qejsRj8Kw8KH/tfX4JCLhiaP/mgeTuWGDHeZQERAT
-xPmRFHaLP9/ZhvGNh6okIYtrKjWTLGoXvKLHcrKNisBLSq+P2WeFrlme1vjvJMo/
-jPwLT5o9CADQmcbKZ+QQ1ZM9v99iDZol7SAMZX43JC019sx6GK0u6xouJBcLfeB4
-OXacTgmSYdTa9RM9fbfVpti01tJ84LV2SyL/VJq/enJF4XQPSynT/tFTn1PAor6o
-tEAAd8fjKdJ6LnD5wb92SPHfQfXqI84rFEO8rUNIE/1ErT6DYifDzVCbfD2KZdoF
-cOSp7TpD77sY1bs74ocBX5ejKtd+aH99D78bJSMM4pSDZsIEwnomkBHTziubPwJb
-OwnATy0LmSMAWOw5rKbsh5nfwCiUTM20xp0t5JeXd+wPVWbpWqI2EnkCEN+RJr9i
-7dp/ymDQ+Yt5wrsN3NwoyiexPOG91WQVCADdErHsnglVZZq9Z8Wx7KwecGCUurJ2
-H6lKudv5YOxPnAzqZS5HbpZd/nRTMZh2rdXCr5m2YOuewyYjvM757AkmUpM09zJX
-MQ1S67/UX2y8/74TcRF97Ncx9HeELs92innBRXoFitnNguvcO6Esx4BTe1OdU6qR
-ER3zAmVf22Le9ciXbu24DN4mleOH+OmBx7X2PqJSYW9GAMTsRB081R6EWKH7romQ
-waxFrZ4DJzZ9ltyosEJn5F32StyLrFxpcrdLUoEaclZCv2qka7sZvi0EvovDVEBU
-e10jOx9AOwf8Gj2ufhquQ6qgVYCzbP+YrodtkFrXRS3IsljIchj1M2ffB/0bfoUs
-rtER9pLvYzCjBPg8IfGLw0o754Qbhh/ReplCRTusP/fQMybvCvfxreS3oyEriu/G
-GufRomjewZ8EMHDIgUsLcYo2UHZsfF7tcazgxMGmMvazp4r8vpgrvW/8fIN/6Adu
-tF+WjWDTvJLFJCe6O+BFJOWrssNrrra1zGtLC1s8s+Wfpe+bGPL5zpHeebGTwH1U
-22eqgJArlEKxrfarz7W5+uHZJHSjF/K9ZvunLGD0n9GOPMpji3UO3zeM8IYoWn7E
-/EWK1XbjnssNemeeTZ+sDh+qrD7BOi+vCX1IyBxbfqnQfJZvmcPWpruy1UsO+aIC
-0GY8Jr3OL69dDQ21jueJAh8EGAEIAAkFAlC9+dkCGwwACgkQL0VeKCTRjd9HCw/+
-LQSVgLLF4ulYlPCjWIIuQwrPbJfWUVVr2dPUFVM85DCv8gBzk5c121snXh9Swovm
-laBbw6ate3BmbXLh64jVE9Za5sbTWi7PCcbO/bpRy4d6oLmitmNw6cq0vjTLxUYy
-bwuiJxWREkfxuU85EKdouN062YDevH+/YResmlJrcCE7LRlJFeRlKsrrwBU3BqYd
-GgFJjKjQC1peeQ9fj62Y7xfwE9+PXbkiWO5u/Bk8hb1VZH1SoIRU98NHVcp6BVvp
-VK0jLAXuSauSczULmpRjbyt1lhaAqivDTWEEZXiNNbRyp17c3nVdPWOcgBr42hdQ
-z25CgZgyLCsvu82wuXLKJblrIPJX3Yf+si6KqEWBsmwdOWybsjygaF5HvzgFqAAD
-U0goPWoQ71PorP2XOUNp5ZLkBQp5etvtkksjVNMIhnHn8PGMuoxO39EUGlWj2B5l
-Cu8tSosAzB1pS8NcLZzoNoI9dOHrmgJmP+GrOUkcf5GhNZbMoj4GNfGBRYX0SZlQ
-GuDrwNKYj73C4MWyNnnUFyq8nDHJ/G1NpaF2hiof9RBL4PUU/f92JkceXPBXA8gL
-Mz2ig1OButwPPLFGQhWqxXAGrsS3Ny+BhTJfnfIbbkaLLphBpDZm1D9XKbAUvdd1
-RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
-=JTFu
------END PGP PRIVATE KEY BLOCK-----
-"""
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
-# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>"
-PUBLIC_KEY_2 = """
+# key 0F91B402: someone@somedomain.org
+# 9420 EC7B 6DCB 867F 5592 E6D1 7504 C974 0F91 B402
+ADDRESS_OTHER = "someone@somedomain.org"
+PUBLIC_KEY_OTHER = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
-Version: GnuPG v1.4.10 (GNU/Linux)
-
-mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR
-gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq
-Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0
-IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle
-AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E
-gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw
-ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4
-JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz
-VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt
-Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63
-yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ
-f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X
-Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck
-I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ=
-=Thdu
+Version: GnuPG v1
+
+mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL
+kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM
+jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr
+wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt
+QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU
+AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv
+bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi
+9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ
+cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj
+TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R
+Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7
+hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN
+BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv
+u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq
+MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq
+2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP
+SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC
+1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA
+CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6
+DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy
+6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56
+U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw
+XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg
+QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg==
+=gDzy
-----END PGP PUBLIC KEY BLOCK-----
"""
-
-PRIVATE_KEY_2 = """
------BEGIN PGP PRIVATE KEY BLOCK-----
-Version: GnuPG v1.4.10 (GNU/Linux)
-
-lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD
-kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1
-6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB
-AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8
-H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks
-7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X
-C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje
-uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty
-GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI
-1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v
-dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG
-CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh
-8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD
-izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT
-oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL
-juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw
-cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe
-94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC
-rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx
-77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2
-3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF
-UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO
-2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB
-/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE
-JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda
-z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk
-o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6
-THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0
-=a5gs
------END PGP PRIVATE KEY BLOCK-----
-"""
-import unittest
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
new file mode 100644
index 0000000..5f85c74
--- /dev/null
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+# test_keymanager.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Tests for the OpenPGP support on Key Manager.
+"""
+
+
+from twisted.internet.defer import inlineCallbacks
+
+from leap.keymanager import (
+ KeyNotFound,
+ openpgp,
+)
+from leap.keymanager.openpgp import OpenPGPKey
+from leap.keymanager.tests import (
+ KeyManagerWithSoledadTestCase,
+ ADDRESS,
+ ADDRESS_2,
+ KEY_FINGERPRINT,
+ PUBLIC_KEY,
+ PUBLIC_KEY_2,
+ PRIVATE_KEY,
+ PRIVATE_KEY_2,
+)
+
+
+class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
+
+ # set the trial timeout to 20min, needed by the key generation test
+ timeout = 1200
+
+ @inlineCallbacks
+ def _test_openpgp_gen_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, 'user@leap.se')
+ key = yield pgp.gen_key('user@leap.se')
+ self.assertIsInstance(key, openpgp.OpenPGPKey)
+ self.assertEqual(
+ ['user@leap.se'], key.address, 'Wrong address bound to key.')
+ self.assertEqual(
+ 4096, key.length, 'Wrong key length.')
+
+ @inlineCallbacks
+ def test_openpgp_put_delete_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+
+ @inlineCallbacks
+ def test_openpgp_put_ascii_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self.assertIsInstance(key, openpgp.OpenPGPKey)
+ self.assertTrue(
+ ADDRESS in key.address, 'Wrong address bound to key.')
+ self.assertEqual(
+ 4096, key.length, 'Wrong key length.')
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+
+ @inlineCallbacks
+ def test_get_public_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self.assertTrue(ADDRESS in key.address)
+ self.assertFalse(key.private)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+
+ @inlineCallbacks
+ def test_openpgp_encrypt_decrypt(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ # encrypt
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ cyphertext = pgp.encrypt(data, pubkey)
+
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != data)
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+
+ # decrypt
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ decrypted, _ = pgp.decrypt(cyphertext, privkey)
+ self.assertEqual(decrypted, data)
+
+ yield pgp.delete_key(pubkey)
+ yield pgp.delete_key(privkey)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=False)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+
+ @inlineCallbacks
+ def test_verify_with_private_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey)
+ self.assertRaises(
+ AssertionError,
+ pgp.verify, signed, privkey)
+
+ @inlineCallbacks
+ def test_sign_with_public_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ self.assertRaises(
+ AssertionError,
+ pgp.sign, data, ADDRESS, OpenPGPKey)
+
+ @inlineCallbacks
+ def test_verify_with_wrong_key_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey)
+ yield pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
+ wrongkey = yield pgp.get_key(ADDRESS_2)
+ self.assertFalse(pgp.verify(signed, wrongkey))
+
+ @inlineCallbacks
+ def test_encrypt_sign_with_public_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ self.assertRaises(
+ AssertionError,
+ pgp.encrypt, data, privkey, sign=pubkey)
+
+ @inlineCallbacks
+ def test_decrypt_verify_with_private_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ encrypted_and_signed = pgp.encrypt(
+ data, pubkey, sign=privkey)
+ self.assertRaises(
+ AssertionError,
+ pgp.decrypt,
+ encrypted_and_signed, privkey, verify=privkey)
+
+ @inlineCallbacks
+ def test_decrypt_verify_with_wrong_key(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ encrypted_and_signed = pgp.encrypt(data, pubkey, sign=privkey)
+ yield pgp.put_ascii_key(PUBLIC_KEY_2, ADDRESS_2)
+ wrongkey = yield pgp.get_key(ADDRESS_2)
+ decrypted, validsign = pgp.decrypt(encrypted_and_signed, privkey,
+ verify=wrongkey)
+ self.assertEqual(decrypted, data)
+ self.assertFalse(validsign)
+
+ @inlineCallbacks
+ def test_sign_verify(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey, detach=False)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ validsign = pgp.verify(signed, pubkey)
+ self.assertTrue(validsign)
+
+ @inlineCallbacks
+ def test_encrypt_sign_decrypt_verify(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+
+ yield pgp.put_ascii_key(PRIVATE_KEY_2, ADDRESS_2)
+ pubkey2 = yield pgp.get_key(ADDRESS_2, private=False)
+ privkey2 = yield pgp.get_key(ADDRESS_2, private=True)
+
+ data = 'data'
+ encrypted_and_signed = pgp.encrypt(
+ data, pubkey2, sign=privkey)
+ res, validsign = pgp.decrypt(
+ encrypted_and_signed, privkey2, verify=pubkey)
+ self.assertEqual(data, res)
+ self.assertTrue(validsign)
+
+ @inlineCallbacks
+ def test_sign_verify_detached_sig(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signature = yield pgp.sign(data, privkey, detach=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ validsign = pgp.verify(data, pubkey, detached_sig=signature)
+ self.assertTrue(validsign)
+
+ def _assert_key_not_found(self, pgp, address, private=False):
+ d = pgp.get_key(address, private=private)
+ return self.assertFailure(d, KeyNotFound)
diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py
new file mode 100644
index 0000000..ddf1170
--- /dev/null
+++ b/src/leap/keymanager/tests/test_validation.py
@@ -0,0 +1,345 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the Validation Levels
+"""
+
+import unittest
+from datetime import datetime
+from twisted.internet.defer import inlineCallbacks
+
+from leap.keymanager.openpgp import OpenPGPKey
+from leap.keymanager.errors import (
+ KeyNotValidUpgrade
+)
+from leap.keymanager.tests import (
+ KeyManagerWithSoledadTestCase,
+ ADDRESS,
+ PUBLIC_KEY,
+ KEY_FINGERPRINT
+)
+from leap.keymanager.validation import ValidationLevels
+
+
+class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
+
+ @inlineCallbacks
+ def test_none_old_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
+
+ @inlineCallbacks
+ def test_cant_upgrade(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Trust)
+ d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+
+ @inlineCallbacks
+ def test_fingerprint_level(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Fingerprint)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+
+ @inlineCallbacks
+ def test_expired_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+
+ @inlineCallbacks
+ def test_expired_fail_lower_level(self):
+ km = self._key_manager()
+ yield km.put_raw_key(
+ EXPIRED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Third_Party_Endorsement)
+ d = km.put_raw_key(
+ UNRELATED_KEY,
+ OpenPGPKey,
+ ADDRESS,
+ validation=ValidationLevels.Provider_Trust)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+
+ @inlineCallbacks
+ def test_roll_back(self):
+ km = self._key_manager()
+ yield km.put_raw_key(EXPIRED_KEY_UPDATED, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(EXPIRED_KEY, OpenPGPKey, ADDRESS)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
+
+ @inlineCallbacks
+ def test_not_used(self):
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Trust)
+ yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+
+ @inlineCallbacks
+ def test_used(self):
+ TEXT = "some text"
+
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
+
+ km2 = self._key_manager()
+ yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
+ signature = yield km2.sign(TEXT, ADDRESS, OpenPGPKey)
+
+ yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature)
+ d = km.put_raw_key(
+ UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+
+ @inlineCallbacks
+ def test_signed_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(SIGNED_KEY, OpenPGPKey, ADDRESS)
+ key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
+ self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT)
+
+
+# Key material for testing
+
+# key 901FBCA5: public key "Leap Test Key <leap@leap.se>"
+UNRELATED_FINGERPRINT = "ABCCD9C8270B6A8D5633FAC9D04DB2E4901FBCA5"
+UNRELATED_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1
+
+mQENBFQ9VDoBCACbKflcEhUXZULOT4Fwc2ifRUllJpusd2uX5oeDlZdZ15uLY2eF
+LcxnAdIWkI/PsXimh0ev/Pf4oCynfmt02I3c2d9F0N6JXWnRiP+p098oPOcqeEqL
+N3CrkH1RVnEXNeJ/Fu7tkD61SBXl1MytMfcHyhN5arg8OcVAjcmghX53+92jFhC9
+8ss87H/qEe5vEX/ahP3tiL5ULvaS4GIX+XB0O3yCVdRoRG9lqMIBP/ZqCkKrNll8
+dT12a6ByG/rWharZUeUETiM4Y+JjDUUaEC2YhNF9k52JNGanLH9LTTtlKy5WTT+E
+C6T6VMAtkwcBDpkXr5sBB/N+Y1z0Fp359lIXABEBAAG0HExlYXAgVGVzdCBLZXkg
+PGxlYXBAbGVhcC5zZT6JATgEEwECACIFAlQ9VDoCGwMGCwkIBwMCBhUIAgkKCwQW
+AgMBAh4BAheAAAoJENBNsuSQH7ylsSUIAIxUFbkeTdHbCF/LVA2U+ktnR1iVikAY
+vFK+U+Bto11/AO4Kew2eWniDch/sqLQOoSydtP42z2z3/Al3u7LhQ8bElQHPDY78
+t49qweyJi00V3vCKCdWwPJnPM5eJOIrZHCbwIgeXCsXxVNJVyziVqMuum+px1h2d
+1YJZXYejT8rzwa3yBPAsGWRAWETeTvUuyjPMFa59scbnaDuY+bwQ2r/qG9m7UyHU
+h2kAHC5sf1rixVOY6rLhw75gQHE/L2BZJRfVsDQqIpEMh2OgMfNbL928jncjwQvc
+/IXXwSUx7y50ll+uNh+TVLf0MlUjKdHmHqnGBMlIIWojWJuKxYmOOoO5AQ0EVD1U
+OgEIAM/TlhWVSI+tl5XBUAcf60RxjpHQkmdfq1i1jgwUgu/638EKzBfLcnRYX8Rn
+DO9CWnHcql/4hp226fIWZN/SyReE81n7UkLDMAglhHgiezHMSH1GYVu4IlfpLVXn
+brLVo83KioH5MPFWmZv5tigpU/G8dTx9yVGv1//YW2qqRYYqeIKJfapBaY/bNqyD
+vYRfZo1K2brtHx4bToY6mALRF4ruV5SVZGS69e4Sh692C2pXSVbCpRhQ/2WnvkZH
+leFIdmNmQN61MC1k26A620Rm+pAsXX71dln0u96xbrCgEVbi6ccfXzbFKtVmThVB
+w11CLvVTviOm99TmcgpmDS4cf08AEQEAAYkBHwQYAQIACQUCVD1UOgIbDAAKCRDQ
+TbLkkB+8pR+fB/0SeTcRr1duN7VYWdtng1+jO0ornIBtUraglN01dEEmiwN83DTi
+J37i+nll+4is7BtiXqhumRptKh1v8UUMyFX/rjjoojCJBg5NExsiOYl3O4le68oF
+3+XC+n7yrlyNmI15+3dcQmC9F6HN8EBZgrn5YPKGIOMHTGatB5PryMKg2IKiN5GZ
+E0hmrOQgmcGrkeqysKACQYUHTasSk2IY1l1G5YQglqCaBh4+UC82Dmg5fTBbHjxP
+YhhojkP4aD/0YW7dgql3nzYqvPCAjBH1Cf6rA9HvAJwUP9Ig/okcrrPEKm638+mG
++vNIuLqIkA4oFLBAAIrgMiQZ+NZz9uD6DJE7
+=FO7G
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+
+# key A1885A7C: public key "Leap Test Key <leap@leap.se>"
+EXPIRED_FINGERPRINT = "7C1F68B0E14157B09B5F4ADE6F15F004A1885A7C"
+EXPIRED_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.12 (GNU/Linux)
+
+mQENBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj
+b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ
+82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3
+acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A
+IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV
+wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAG0HExlYXAgVGVzdCBLZXkg
+PGxlYXBAbGVhcC5zZT6JAT4EEwECACgFAhvrfd0CGwMFCQABUYAGCwkIBwMCBhUI
+AgkKCwQWAgMBAh4BAheAAAoJEG8V8AShiFp8VNkH/iCQcXkTfMOVlL2rQRyZtJEO
+Lr5uTyyY8O6ubeNCHqZzlIopiPAsv4hIYjjMDvOfZ9R53YgmbacUm0rvh1B4MSUf
+k+sa9/tequ3y44LUKp7AB6NyyLgVOU5ngl2w+bi7CgXAep3oP4joYKcU0mmSAc2S
+2Gj85DVqP0kdzNs47esvyj7g1TOfdBwmLsTx/219H+w3dNBeyCQWkYCYNh7MX/Ba
+SZ+P0xr4FetcOVPM3wAzUtDG7hKsgccoIXt0FWhG/nn8cETfGH+o3W/ky7Jktatx
+DGDHoZJvAaG2B2ey1pAQlezr8p/O+ZVABiigHk1S+myBHyhlXzUcjhQnEG7aHZ65
+AQ0EG+t93QEIAKqRq/2sBDW4g3FU+11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1
+XeA+kTHiF0LaqoaciDRvkA9DvhDbSrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5
+sXbuipY3TEiakugdSU4rzgi0hFycm6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm
+4thYPuJ1kPH8/bkbTi9sLHoApYgL+7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3
+leldixHHKAutNt49p0pkXlORAHRpUmp+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQ
+KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwUCG+t93QIb
+DAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyykuqEeNb1LV
+D9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5YrwxGdmoyBLm
+unaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0sH6Kj5/j
+Mgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6DzmRt1rJQcq
+K/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyYHEsoyd7W
+Osuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci
+=WhX+
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+# updated expiration date
+EXPIRED_KEY_NEW_EXPIRY_DATE = datetime.fromtimestamp(2049717872)
+EXPIRED_KEY_UPDATED = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.12 (GNU/Linux)
+
+mQENBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj
+b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ
+82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3
+acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A
+IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV
+wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAG0HExlYXAgVGVzdCBLZXkg
+PGxlYXBAbGVhcC5zZT6JAT4EEwECACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4B
+AheABQJUlDCSBQleQLiTAAoJEG8V8AShiFp8t3QH/1eqkVIScXmqaCVeno3VSKiH
+HqnxiHcEgtpNRfUlP6tLD4H6QPEpvoUI9S/8HSYi3nbDGXEX8ycKlnwxjdIqWSOW
+xj91/7uQAo+dP9QaVJ6xgaAiqzN1x3JzX3Js1wTodmNV0TfmGjxwnC5up/xK7/pd
+KuDP3woDsRlwy8Lgj67mkn49xfAFHo6hI6SD36UBDAC/ELq6kZaba4Kk0fEVHCEz
+HX0B09ZIY9fmf305cEB3dNh6SMQgKtH0wKozaqI2UM2B+cs3z08bC+YuUUh7UJTH
+yr+hI7vF4/WEeJB3fuhP3xsumLhV8P47DaJ7oivmtsDEbAJFKqvigEqNES73Xpy5
+AQ0EG+t93QEIAKqRq/2sBDW4g3FU+11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1
+XeA+kTHiF0LaqoaciDRvkA9DvhDbSrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5
+sXbuipY3TEiakugdSU4rzgi0hFycm6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm
+4thYPuJ1kPH8/bkbTi9sLHoApYgL+7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3
+leldixHHKAutNt49p0pkXlORAHRpUmp+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQ
+KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwIbDAUCVJQw
+3QUJXkC4/QAKCRBvFfAEoYhafEtiB/9hMfSFNMxtlIJDJArG4JwR7sBOatYUT858
+qZnTgGETZN8wXpeEpXWKdDdmCX9aeE9jsDNgSQ5WWpqU21bGMXh1IGjAzmqTqq3/
+ik1vALuaVfr6OqjTzrJVQujT61CGed26xpP3Zh8hLKyKa+dXnX/VpgZS42wZLPx2
+wcODfANmTfE2AhMap/RyDy21q4nau+z2hMEOKdtF8dpP+pEvzoN5ZexYP1hfT+Av
+oFPyVB5YtEMfxTEshDKRPjbdgNmw4faKXd5Cbelo4YxxpO16FHb6gzIdjOX15vQ+
+KwcVXzg9xk4D3cr1mnTCops/iv6TXvcw4Wbo70rrKXwkjl8LKjOP
+=sHoe
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+UNEXPIRED_KEY = EXPIRED_KEY_UPDATED
+UNEXPIRED_PRIVATE = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.12 (GNU/Linux)
+
+lQOYBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj
+b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ
+82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3
+acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A
+IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV
+wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAEAB/0cwelrGEdmG+Z/RxZx
+4anvpzNNMRSJ0Xu508SVk4vElCQrlaPfFZC1t0ZW1XcHsQ5Gsy/gxaA4YbK1RXV2
+8uvvWh5oTsdLByzj/cSLLp5u+cYxyuaBOb/jiAiCPVEFnEec23pQ4fumwpebgX5f
+FLGCVYAqWc2EMqOFVgnAEJ9TbIWRnCkN04r1WSc7eLcUlH+vTp4HUPd6PQj56zSr
+J5beeviHgYB76M6mcM/BRzLmcl4M7bgx5olp8A0Wz7ub+hXICmNQyqpE8qZeyGjq
+v4T/6BSpsp5yEGDMkahFyO7OwB7UI6SZGkdnWKGeXOWG48so6cFdZ8dxRGx49gFL
+1rP1BADfYjQDfmBpB6tC1MyATb1MUK/1CN7wC5w7fXCtPbYNiqc9s27W9NXQReHD
+GOU04weU+ZJsV6Fwlt3oRD2j05vNdhbqKseLdsm27/kg2GWZvjamriHqZ94sw6yk
+fg3MqPb4JdFzBZVHqD50AHASx2rMshBeMVo27LhcADCWM9P8bwQA4yeRonbIAUls
+yAwWIRCMel2JY1u/zmJrg8FFAG2LYx+pYaxkRxjSJNlQQV7o6aYiU3Yw+nXvj5Pz
+IdOdimWfFb8eZ3U6tbognJxjwU8vV3ili40O7SENgloeM/nzg+nQjIaS9utfE8Et
+juV7f9OWi8Fo+xzSOvUGwoL/zW5t+UsD/0bm+5ch53Sm1ITCn7yfMrp0YaT+YC3Y
+mNNfrfbFpEd20ky4K9COIFDFCJmMyKLx/jSajcf4JqrxB/mOmHHAF9CeL7LUy/XV
+O8Ec5lkovicDIDT1b+pQYEYvh5UBJmoq1R5nbNLo70gFtGP6b4+t27Gxks5VLhF/
+BVvxK7xjmkBETnq0HExlYXAgVGVzdCBLZXkgPGxlYXBAbGVhcC5zZT6JAT4EEwEC
+ACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheABQJUURIXBQld/ZovAAoJEG8V
+8AShiFp8xUcIALcAHZbaxvyhHRGOrwDddbH0fFDK0AqKTsIT7y4D/HLFCP5zG3Ck
+7qGPZdkHXZfzq8rIb+zUjW3oJIVI1IucHxG2T5kppa8RFCBAFlRWYf6R3isX3YL0
+d3QSragjoxRNPcHNU8ALHcvfSonFHBoi4fH44rvgksAiT68SsdPaoXDlabx5T15e
+vu/7T5e/DGMQVPMxiaSuSQhbOKuMk2wcFdmLtBYHLZPa54hHPNhEDyxLgtKKph0g
+Obk9ojKfH9kPvLveIcpS5CqTJfN/kqBz7CJWwEeAi2iG3H1OEB25aCUdTxXSRNlG
+qEgcWPaWxtc1RzlARu7LB64OUZuRy4puiAGdA5gEG+t93QEIAKqRq/2sBDW4g3FU
++11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1XeA+kTHiF0LaqoaciDRvkA9DvhDb
+SrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5sXbuipY3TEiakugdSU4rzgi0hFyc
+m6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm4thYPuJ1kPH8/bkbTi9sLHoApYgL
++7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3leldixHHKAutNt49p0pkXlORAHRp
+Ump+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQKLyKoh5wsJsaPXBjdG7cf6G/cBcw
+vnQVUHcAEQEAAQAH/A0TCHNz3Yi+oXis8m2WzeyU7Sw6S4VOLnoXMgOhf/JLXVoy
+S2P4qj73nMqNkYni2AJkej5GtOyunSGOpZ2zzKQyhigajq76HRRxP5oXwX7VLNy0
+bguSrys2IrJb/8Fq88rN/+H5kpvxNlog+P79wzTta5Y9/yIVJDNXIip/ptVARhA7
+CrdDyE4EaPjcWCS3/9a4R8JDZl19PlTE23DD5ffZv5wNEX38oZkDCK4Si+kqhvm7
+g0Upr49hnvqRPXoi46OBAoUh9yVTKaNDMsRWblvno7k3+MF0CCnix5p5JR74bGnZ
+8kS14qXXkAa58uMaAIcv86+mHNovXhaxcog+8k0EAM8wWyWPjdO2xbwwB0hS2E9i
+IO/X26uhLY3/tozbOekvqXKvwIy/xdWNVHr7eukAS+oIY10iczgKkMgquoqvzR4q
+UY5WI0iC0iMLUGV7xdxusPl+aCbGKomtN/H3JR2Wecgje7K/3o5BtUDM6Fr2KPFb
++uf/gqVkoMmp3O/DjhDlBADSwMHuhfStF+eDXwSQ4WJ3uXP8n4M4t9J2zXO366BB
+CAJg8enzwQi62YB+AOhP9NiY5ZrEySk0xGsnVgex2e7V5ilm1wd1z2el3g9ecfVj
+yu9mwqHKT811xsLjqQC84JN+qHM/7t7TSgczY2vD8ho2O8bBZzuoiX+QIPYUXkDy
+KwP8DTeHjnI6vAP2uVRnaY+bO53llyO5DDp4pnpr45yL47geciElq3m3jXFjHwos
+mmkOlYAL07JXeZK+LwbhxmbrwLxXNJB//P7l8ByRsmIrWvPuPzzcKig1KnFqvFO1
+5wGU0Pso2qWZU+idrhCdG+D8LRSQ0uibOFCcjFdM0JOJ7e1RdIkBJQQYAQIADwUC
+G+t93QIbDAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyyku
+qEeNb1LVD9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5Yrwx
+GdmoyBLmunaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0
+sH6Kj5/jMgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6Dzm
+Rt1rJQcqK/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyY
+HEsoyd7WOsuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci
+=dZE8
+-----END PGP PRIVATE KEY BLOCK-----
+"""
+
+# key CA1AD31E: public key "Leap Test Key <leap@leap.se>"
+# signed by E36E738D69173C13D709E44F2F455E2824D18DDF
+SIGNED_FINGERPRINT = "6704CF3362087DA23E3D2DF8ED81DFD1CA1AD31E"
+SIGNED_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.12 (GNU/Linux)
+
+mQENBFQ9DHMBCADJXyNVzTQ+NnmSDbR6q8jjDsnqk/IgKrMBkpjNxUa/0HQ4o0Yh
+pklzR1hIc/jsdgq42A0++pqdfQFeRc2NVw/NnE/9uzW73YuaWg5XnWGjuAP3UeRI
+3xjL/cscEFmGfGkuGvFpIVa7GBPqz1SMBXWULJbkCE1pnHfgqh0R7oc5u0omnsln
+0zIrmLX1ufpDRSUedjSgIfd6VqbkPm3NJuZE4NVn6spHG3zTxqcaPCG0xLfHw7eS
+qgUdz0BFaxqtQiXffBpA3KvGJW0792VjDh4M6kDvpeYpKRmB9oEYlT3n3KvQrdPE
+B3N5KrzJj1QIL990q4NQdqjg+jUE5zCJsTdzABEBAAG0HExlYXAgVGVzdCBLZXkg
+PGxlYXBAbGVhcC5zZT6JATgEEwECACIFAlQ9DHMCGwMGCwkIBwMCBhUIAgkKCwQW
+AgMBAh4BAheAAAoJEO2B39HKGtMeI/4H/0/OG1OqtQEoYscvJ+BZ3ZrM2pEk7KDd
+7AEEf6QIGSd38GFyn/pve24cpRLv7phKNy9dX9VJhTDobpKvK0ZT/yQO3FVlySAN
+NVpu93/jrLnrW51J3p/GP952NtUAEP5l1uyYIKZ1W3RLWws72Lh34HTaHAWC94oF
+vnS42IYdTn4y6lfizL+wYD6CnfrIpHm8v3NABEQZ8e/jllrRK0pnOxAdFv/TpWEl
+8AnTZXcejSBgCG6UmDtrRKfgoQlGJEIH61QSqHpRIwkepQVYexUwgcLFAZPI9Hvw
+N5pZQ5Z+XcsYTGtLNEpF7YW6ykLDRTAv6LiBQPkBX8TDKhkh95Cs3sKJAhwEEAEC
+AAYFAlQ9DgIACgkQL0VeKCTRjd/pABAAsNPbiGwuZ469NxwTgf3+EMoWZNHf5ZRa
+ZbzKKesLFEElMdX3Q/MkVc1T8Rsy9Fdn1tf/H6glwuKyqWeXNkxa86VT6gck9WV6
+bslFIl/vJpb3dcmiCCM1tSCYpX0yE0fqebMihcfvNqDw3GdZBUo7R0pWN6BEh4iM
+YYWTAtuPCrbsv+2bSid1ZLIO6FIF5kskg60h/IbSr+A+DSBgyyjf9fbUv6MoyMw8
+08GtCAx6VGJhTTC/RkWIA+N3n83W5XQFszOOg/PAAg0JMUXUBGvjfYJ5fcB8cfuw
+1XZe9uWsDmYpwfVEtDajrLbatkXAu22pjIJnB4cVqiD+4hHbBCFkeZIfdRsPEINO
+UacsjVZV5/EPDN9OpkvZbkrLJ6eaQnmQZnFclquNHUCqFI0QYUml0BXXaZq+aEJ9
+N9x00kdYV1xW6zkL+MGgxdViC5n6dwJcU3MANrykV8Cc5/x+wmwY8AXbHzU7MxvY
+nGlAYsAZHhf4ZlEdAO6C329VotMxBLFd5DJZZoN+ysaOpsUNRl0JO41+6bbI141l
+DCmzWUG4iTI70zxsgzZGgEt0HlMDoIxElPcy/jDKi1IfEDmveK+QR9WphM40Ayvx
+VTeA6g9WagmoHopQs/D/Kbi3Q8izFDfXTwA52DUxTjyUEFn0jEOiG9BFmnIkQ6LE
+3WkIJFd3D0+5AQ0EVD0McwEIALRXukBsOrcA/rNJ4SV4I64cGdN8q9Gl5RpLl8cS
+L5+SGHp6KoCL4daBqpbxdhE/Ylb3QmPt2SBZbTkwJ2yuczELOyhH6R13OWRWCgkd
+dYLZsm/sEzu7dVoFQ4peKTGDzI8mQ/s157wRkz/8iSUYjJjeM3g0NI55FVcefibN
+pOOFRaYGUh8itofRFOu7ipZ9F8zRJdBwqISe1gemNBR+O3G3Srm34PYu6sZRsdLU
+Nf+81+/ynQWQseVpbz8X93sx/onIYIY0w+kxIE0oR/gBBjdsMOp7EfcvtbGTgplQ
++Zxln/pV2bVFkGkjKniFEEfi4eCPknCj0+67qKRt/Fc9n98AEQEAAYkBHwQYAQIA
+CQUCVD0McwIbDAAKCRDtgd/RyhrTHmmcCACpTbjOYjvvr8/rD60bDyhnfcKvpavO
+1/gZtMeEmw5gysp10mOXwhc2XuC3R1A6wVbVgGuOp5RxlxI4w8xwwxMFSp6u2rS5
+jm5slXBKB2i3KE6Sao9uZKP2K4nS8Qc+DhathfgafI39vPtBmsb1SJd5W1njNnYY
+hARRpViUcVdfvW3VRpDACZ79PBs4ZQQ022NsNAPwm/AJvAw2S42Y9tjTnaLVRLfH
+lzdErcGTBnfEIi0mQF/11k/THBJxx7vaFt8YXiDlWLUkg5XW3xK9mkETbaTv+/tB
+X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn
+=xprj
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py
new file mode 100644
index 0000000..734cfce
--- /dev/null
+++ b/src/leap/keymanager/validation.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Validation levels implementation for key managment.
+
+See:
+ https://leap.se/en/docs/design/transitional-key-validation
+"""
+
+
+from datetime import datetime
+
+
+class ValidationLevel(object):
+ """
+ A validation level
+
+ Meant to be used to compare levels or get its string representation.
+ """
+ def __init__(self, name, value):
+ self.name = name
+ self.value = value
+
+ def __cmp__(self, other):
+ return cmp(self.value, other.value)
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return "<ValidationLevel: %s (%d)>" % (self.name, self.value)
+
+
+class _ValidationLevels(object):
+ """
+ Handler class to manage validation levels. It should have only one global
+ instance 'ValidationLevels'.
+
+ The levels are attributes of the instance and can be used like:
+ ValidationLevels.Weak_Chain
+ ValidationLevels.get("Weak_Chain")
+ """
+ _level_names = ("Weak_Chain",
+ "Provider_Trust",
+ "Provider_Endorsement",
+ "Third_Party_Endorsement",
+ "Third_Party_Consensus",
+ "Historically_Auditing",
+ "Known_Key",
+ "Fingerprint")
+
+ def __init__(self):
+ for name in self._level_names:
+ setattr(self, name,
+ ValidationLevel(name, self._level_names.index(name)))
+
+ def get(self, name):
+ """
+ Get the ValidationLevel of a name
+
+ :param name: name of the level
+ :type name: str
+ :rtype: ValidationLevel
+ """
+ return getattr(self, name)
+
+
+ValidationLevels = _ValidationLevels()
+
+
+def can_upgrade(new_key, old_key):
+ """
+ :type new_key: EncryptionKey
+ :type old_key: EncryptionKey
+ :rtype: bool
+ """
+ # First contact
+ if old_key is None:
+ return True
+
+ # An update of the same key
+ if new_key.fingerprint == old_key.fingerprint:
+ return True
+
+ # Manually verified fingerprint
+ if new_key.validation == ValidationLevels.Fingerprint:
+ return True
+
+ # Expired key and higher validation level
+ if (old_key.expiry_date is not None and
+ old_key.expiry_date < datetime.now() and
+ new_key.validation >= old_key.validation):
+ return True
+
+ # No expiration date and higher validation level
+ if (old_key.expiry_date is None and
+ new_key.validation > old_key.validation):
+ return True
+
+ # Not successfully used and strict high validation level
+ if (not (old_key.sign_used and old_key.encr_used) and
+ new_key.validation > old_key.validation):
+ return True
+
+ # New key signed by the old key
+ if old_key.key_id in new_key.signatures:
+ return True
+
+ return False