summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2015-11-10 17:46:56 -0500
committerMicah Anderson <micah@riseup.net>2015-11-10 17:46:56 -0500
commit3b20aa2e8e869556cd0a97c1e6d3404aadfd4526 (patch)
treed76f1253dc45926b19b5cbc8740ae82b59923256
parent8713603ad4ac71c45726a2f76e3cf86313530af1 (diff)
parent4e5e21a8d008fd31307d3581fb5f791b0c2783e8 (diff)
Merge branch 'debian/experimental' into debian/platform-0.8
-rw-r--r--AUTHORS6
-rw-r--r--CHANGELOG18
-rw-r--r--README.rst22
-rw-r--r--debian/changelog14
-rw-r--r--debian/control10
-rw-r--r--debian/copyright2
-rw-r--r--docs/leap-commit-template7
-rw-r--r--docs/leap-commit-template.README47
-rwxr-xr-xpkg/generate_wheels.sh13
-rwxr-xr-xpkg/pip_install_requirements.sh84
-rw-r--r--pkg/requirements-latest.pip8
-rw-r--r--pkg/requirements-leap.pip1
-rw-r--r--pkg/requirements-testing.pip10
-rw-r--r--pkg/requirements.pip6
-rwxr-xr-xpkg/tools/get_authors.sh2
-rw-r--r--pkg/utils.py29
-rw-r--r--setup.cfg8
-rw-r--r--setup.py27
-rw-r--r--src/leap/keymanager/__init__.py180
-rw-r--r--src/leap/keymanager/_version.py4
-rw-r--r--src/leap/keymanager/keys.py10
-rw-r--r--src/leap/keymanager/openpgp.py273
-rw-r--r--src/leap/keymanager/tests/__init__.py14
-rw-r--r--src/leap/keymanager/tests/test_keymanager.py142
-rw-r--r--src/leap/keymanager/tests/test_openpgp.py104
-rw-r--r--src/leap/keymanager/tests/test_validation.py48
-rw-r--r--src/leap/keymanager/validation.py73
27 files changed, 908 insertions, 254 deletions
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..5e64248
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,6 @@
+Tomás Touceda <chiiph@leap.se>
+Ruben Pollan <meskio@sindominio.net>
+Ivan Alejandro <ivanalejandro0@gmail.com>
+drebs <drebs@leap.se>
+Kali Kaneko <kali@leap.se>
+Parménides GV <parmegv@sdf.org>
diff --git a/CHANGELOG b/CHANGELOG
index 381e4e7..0a9221d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,21 @@
+0.4.3 Oct 28, 2015:
+ o self-repair the keyring if keys get duplicated. Closes: #7498
+ o catch request exceptions on key fetching. Closes #7410.
+ o Don't repush a public key with different addres
+ o use async events api. Closes #7224.
+ o Use ca_bundle when fetching keys by url.
+ o add logging to fetch_key. Related: #7410.
+ o more verbosity in get_key wrong address log.
+ o don't repush a public key with different address. Related #7420.
+
+0.4.2 Aug 26, 2015:
+ o Style changes.
+ o Tests updates.
+ o Packaging improvements.
+
+0.4.1 Jul 10, 2015:
+ o Remove the dependency on enum34. Closes: #7188.
+
0.4.0 Jun 8, 2015:
o Adapt to new events api on leap.common. Related to #5359.
o Add 'fetch_key' method to fetch keys from a URI. Closes #5932.
diff --git a/README.rst b/README.rst
index acf2335..1d4b53e 100644
--- a/README.rst
+++ b/README.rst
@@ -1,11 +1,16 @@
LEAP's Key Manager
==================
-.. image:: https://pypip.in/v/leap.keymanager/badge.png
- :target: https://crate.io/packages/leap.keymanager
+.. image:: https://badge.fury.io/py/leap.keymanager.svg
+ :target: http://badge.fury.io/py/leap.keymanager
+.. image:: https://img.shields.io/pypi/dm/leap.keymanager.svg
+ :target: http://badge.fury.io/py/leap.keymanager
-The Key Manager is a Nicknym agent for the LEAP project:
+The Key Manager is a `Nicknym`_ agent for the `LEAP`_ project, written in python using the `twisted`_ async framework.
+
+.. _`Nicknym`: https://leap.se/nicknym
+.. _`LEAP`: https://leap.se/docs/
+.. _`twisted`: https://twistedmatrix.com/trac/
- https://leap.se/pt/docs/design/nicknym
running tests
-------------
@@ -13,3 +18,12 @@ running tests
Use trial to run the test suite::
trial leap.keymanager
+
+License
+=======
+
+.. image:: https://raw.github.com/leapcode/bitmask_client/develop/docs/user/gpl.png
+
+leap.keymanager is released under the terms of the `GNU GPL version 3`_ or later.
+
+.. _`GNU GPL version 3`: http://www.gnu.org/licenses/gpl.txt
diff --git a/debian/changelog b/debian/changelog
index 2a2511d..534696c 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,17 @@
+leap-keymanager (0.4.3) unstable; urgency=medium
+
+ * Update to 0.4.3 release.
+
+ -- Ben Carrillo <ben@futeisha.org> Thu, 29 Oct 2015 12:02:54 -0400
+
+leap-keymanager (0.4.2) unstable; urgency=medium
+
+ * Update to 0.4.2 release.
+ * Fix copyright symlink to point to correct GPL version
+ * Update package Description to be more informative
+
+ -- Micah Anderson <micah@leap.se> Tue, 01 Sep 2015 15:44:14 -0400
+
leap-keymanager (0.4.0) unstable; urgency=medium
* Update to 0.4.0 release
diff --git a/debian/control b/debian/control
index 05b0c86..763b603 100644
--- a/debian/control
+++ b/debian/control
@@ -7,7 +7,11 @@ Standards-Version: 3.9.6
Package: leap-keymanager
Architecture: all
-Depends: ${misc:Depends}, ${python:Depends}
+Depends: ${misc:Depends}, ${python:Depends}, gnupg
+Enhances: gnupg
Description: LEAP's Key Manager
- The Key Manager handles all types of keys to allow for point-to-point
- encryption between parties communicating through LEAP infrastructure.
+ The Key Manager is a trusted user agent that is responsible for storing a
+ database of all the keys for the user, updating these keys, and auditing the
+ endorsements of the user’s own keys. Typically, the key manager will run on the
+ user’s device, but might be running on any device the user chooses to trust.
+
diff --git a/debian/copyright b/debian/copyright
index 85071d3..fe27ccb 100644
--- a/debian/copyright
+++ b/debian/copyright
@@ -13,4 +13,4 @@ License: GPL-3+
License: GPL-3+
On Debian systems, the complete text of the GNU General
- Public License can be found in `/usr/share/common-licenses/GPL'.
+ Public License can be found in `/usr/share/common-licenses/GPL-3'.
diff --git a/docs/leap-commit-template b/docs/leap-commit-template
new file mode 100644
index 0000000..8a5c7cd
--- /dev/null
+++ b/docs/leap-commit-template
@@ -0,0 +1,7 @@
+[bug|feat|docs|style|refactor|test|pkg|i18n] ...
+...
+
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
diff --git a/docs/leap-commit-template.README b/docs/leap-commit-template.README
new file mode 100644
index 0000000..ce8809e
--- /dev/null
+++ b/docs/leap-commit-template.README
@@ -0,0 +1,47 @@
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+HOW TO USE THIS TEMPLATE:
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Run `git config commit.template docs/leap-commit-template` or
+edit the .git/config for this project and add
+`template = docs/leap-commit-template`
+under the [commit] block
+
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+COMMIT TEMPLATE FORMAT EXPLAINED
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+[type] <subject>
+
+<body>
+<footer>
+
+Type should be one of the following:
+- bug (bug fix)
+- feat (new feature)
+- docs (changes to documentation)
+- style (formatting, pep8 violations, etc; no code change)
+- refactor (refactoring production code)
+- test (adding missing tests, refactoring tests; no production code change)
+- pkg (packaging related changes; no production code change)
+- i18n translation related changes
+
+Subject should use imperative tone and say what you did.
+For example, use 'change', NOT 'changed' or 'changes'.
+
+The body should go into detail about changes made.
+
+The footer should contain any issue references or actions.
+You can use one or several of the following:
+
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
+
+The Documentation field should be included in every new feature commit, and it
+should link to an issue in the bug tracker where the new feature is analyzed
+and documented.
+
+For a full example of how to write a good commit message, check out
+https://github.com/sparkbox/how_to/tree/master/style/git
diff --git a/pkg/generate_wheels.sh b/pkg/generate_wheels.sh
new file mode 100755
index 0000000..a13e2c7
--- /dev/null
+++ b/pkg/generate_wheels.sh
@@ -0,0 +1,13 @@
+#!/bin/sh
+# Generate wheels for dependencies
+# Use at your own risk.
+
+if [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+fi
+
+pip wheel --wheel-dir $WHEELHOUSE pip
+pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements.pip
+if [ -f pkg/requirements-testing.pip ]; then
+ pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip
+fi
diff --git a/pkg/pip_install_requirements.sh b/pkg/pip_install_requirements.sh
new file mode 100755
index 0000000..8ca0956
--- /dev/null
+++ b/pkg/pip_install_requirements.sh
@@ -0,0 +1,84 @@
+#!/bin/bash
+# Update pip and install LEAP base/testing requirements.
+# For convenience, $insecure_packages are allowed with insecure flags enabled.
+# Use at your own risk.
+# See $usage for help
+
+insecure_packages=""
+leap_wheelhouse=https://lizard.leap.se/wheels
+
+show_help() {
+ usage="Usage: $0 [--testing] [--use-leap-wheels]\n --testing\t\tInstall dependencies from requirements-testing.pip\n
+\t\t\tOtherwise, it will install requirements.pip\n
+--use-leap-wheels\tUse wheels from leap.se"
+ echo -e $usage
+
+ exit 1
+}
+
+process_arguments() {
+ testing=false
+ while [ "$#" -gt 0 ]; do
+ # From http://stackoverflow.com/a/31443098
+ case "$1" in
+ --help) show_help;;
+ --testing) testing=true; shift 1;;
+ --use-leap-wheels) use_leap_wheels=true; shift 1;;
+
+ -h) show_help;;
+ -*) echo "unknown option: $1" >&2; exit 1;;
+ esac
+ done
+}
+
+return_wheelhouse() {
+ if $use_leap_wheels ; then
+ WHEELHOUSE=$leap_wheelhouse
+ elif [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+ fi
+
+ # Tested with bash and zsh
+ if [[ $WHEELHOUSE != http* && ! -d "$WHEELHOUSE" ]]; then
+ mkdir $WHEELHOUSE
+ fi
+
+ echo "$WHEELHOUSE"
+}
+
+return_install_options() {
+ wheelhouse=`return_wheelhouse`
+ install_options="-U --find-links=$wheelhouse"
+ if $use_leap_wheels ; then
+ install_options="$install_options --trusted-host lizard.leap.se"
+ fi
+
+ echo $install_options
+}
+
+return_insecure_flags() {
+ for insecure_package in $insecure_packages; do
+ flags="$flags --allow-external $insecure_package --allow-unverified $insecure_package"
+ done
+
+ echo $flags
+}
+
+return_packages() {
+ if $testing ; then
+ packages="-r pkg/requirements-testing.pip"
+ else
+ packages="-r pkg/requirements.pip"
+ fi
+
+ echo $packages
+}
+
+process_arguments $@
+install_options=`return_install_options`
+insecure_flags=`return_insecure_flags`
+packages=`return_packages`
+
+pip install -U wheel
+pip install $install_options pip
+pip install $install_options $insecure_flags $packages
diff --git a/pkg/requirements-latest.pip b/pkg/requirements-latest.pip
new file mode 100644
index 0000000..148d42b
--- /dev/null
+++ b/pkg/requirements-latest.pip
@@ -0,0 +1,8 @@
+--index-url https://pypi.python.org/simple/
+
+--allow-external u1db --allow-unverified u1db
+--allow-external dirspec --allow-unverified dirspec
+-e 'git+https://github.com/pixelated-project/leap_pycommon.git@develop#egg=leap.common'
+-e 'git+https://github.com/pixelated-project/soledad.git@develop#egg=leap.soledad.common&subdirectory=common/'
+-e 'git+https://github.com/pixelated-project/soledad.git@develop#egg=leap.soledad.client&subdirectory=client/'
+-e .
diff --git a/pkg/requirements-leap.pip b/pkg/requirements-leap.pip
new file mode 100644
index 0000000..4ba1d81
--- /dev/null
+++ b/pkg/requirements-leap.pip
@@ -0,0 +1 @@
+leap.common>=0.4.3
diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip
index 14a4820..addda19 100644
--- a/pkg/requirements-testing.pip
+++ b/pkg/requirements-testing.pip
@@ -1,3 +1,11 @@
mock
-leap.soledad.client
setuptools-trial
+pep8
+
+#-----------------------------------------------------------------------------
+#Although it's not a package dependency, tests also depend on having
+#soledad client installed. Commenting to avoid versioning problem, you should
+#know what you are testing against :)
+#-----------------------------------------------------------------------------
+
+#leap.soledad.client
diff --git a/pkg/requirements.pip b/pkg/requirements.pip
index c81c1a1..c4cb09a 100644
--- a/pkg/requirements.pip
+++ b/pkg/requirements.pip
@@ -1,7 +1,5 @@
-leap.common>=0.4.0
-simplejson
-requests
# if we bump the gnupg version, bump also the sanity check
# in keymanager.__init__
gnupg>=1.4.0
-enum34
+simplejson
+requests
diff --git a/pkg/tools/get_authors.sh b/pkg/tools/get_authors.sh
new file mode 100755
index 0000000..0169bb1
--- /dev/null
+++ b/pkg/tools/get_authors.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+git log --format='%aN <%aE>' | awk '{arr[$0]++} END{for (i in arr){print arr[i], i;}}' | sort -rn | cut -d' ' -f2-
diff --git a/pkg/utils.py b/pkg/utils.py
index deace14..9c9211b 100644
--- a/pkg/utils.py
+++ b/pkg/utils.py
@@ -14,20 +14,34 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
Utils to help in the setup process
"""
-
import os
import re
import sys
+def is_develop_mode():
+ """
+ Returns True if we're calling the setup script using the argument for
+ setuptools development mode.
+
+ This avoids messing up with dependency pinning and order, the
+ responsibility of installing the leap dependencies is left to the
+ developer.
+ """
+ args = sys.argv
+ devflags = "setup.py", "develop"
+ if (args[0], args[1]) == devflags:
+ return True
+ return False
+
+
def get_reqs_from_files(reqfiles):
"""
Returns the contents of the top requirement file listed as a
- string list with the lines
+ string list with the lines.
@param reqfiles: requirement files to parse
@type reqfiles: list of str
@@ -43,6 +57,9 @@ def parse_requirements(reqfiles=['requirements.txt',
"""
Parses the requirement files provided.
+ The passed reqfiles list is a list of possible locations to try, the
+ function will return the contents of the first path found.
+
Checks the value of LEAP_VENV_SKIP_PYSIDE to see if it should
return PySide as a dep or not. Don't set, or set to 0 if you want
to install it through pip.
@@ -58,9 +75,9 @@ def parse_requirements(reqfiles=['requirements.txt',
if re.match(r'\s*-e\s+', line):
pass
# do not try to do anything with externals on vcs
- #requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
- #line))
- # http://foo.bar/baz/foobar/zipball/master#egg=foobar
+ # requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
+ # line))
+ # http://foo.bar/baz/foobar/zipball/master#egg=foobar
elif re.match(r'\s*https?:', line):
requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
line))
diff --git a/setup.cfg b/setup.cfg
index c71bffa..51070c6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,2 +1,10 @@
[aliases]
test = trial
+
+[pep8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
+
+[flake8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
diff --git a/setup.py b/setup.py
index 778909d..26840c6 100644
--- a/setup.py
+++ b/setup.py
@@ -20,6 +20,10 @@ setup file for leap.keymanager
import re
from setuptools import setup
from setuptools import find_packages
+from setuptools import Command
+
+from pkg import utils
+
import versioneer
versioneer.versionfile_source = 'src/leap/keymanager/_version.py'
@@ -27,8 +31,6 @@ versioneer.versionfile_build = 'leap/keymanager/_version.py'
versioneer.tag_prefix = '' # tags are like 1.2.0
versioneer.parentdir_prefix = 'leap.keymanager-'
-from pkg import utils
-
trove_classifiers = [
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
@@ -59,9 +61,6 @@ if len(_version_short) > 0:
cmdclass = versioneer.get_cmdclass()
-from setuptools import Command
-
-
class freeze_debianver(Command):
"""
Freezes the version in a debian branch.
@@ -107,6 +106,22 @@ cmdclass["freeze_debianver"] = freeze_debianver
# XXX add ref to docs
+requirements = utils.parse_requirements()
+
+if utils.is_develop_mode():
+ print
+ print ("[WARNING] Skipping leap-specific dependencies "
+ "because development mode is detected.")
+ print ("[WARNING] You can install "
+ "the latest published versions with "
+ "'pip install -r pkg/requirements-leap.pip'")
+ print ("[WARNING] Or you can instead do 'python setup.py develop' "
+ "from the parent folder of each one of them.")
+ print
+else:
+ requirements += utils.parse_requirements(
+ reqfiles=["pkg/requirements-leap.pip"])
+
setup(
name='leap.keymanager',
version=VERSION,
@@ -129,7 +144,7 @@ setup(
packages=find_packages('src', exclude=['leap.keymanager.tests']),
package_dir={'': 'src'},
test_suite='leap.keymanager.tests',
- install_requires=utils.parse_requirements(),
+ install_requires=requirements,
tests_require=utils.parse_requirements(
reqfiles=['pkg/requirements-testing.pip']),
)
diff --git a/src/leap/keymanager/__init__.py b/src/leap/keymanager/__init__.py
index 47f479b..c7886e0 100644
--- a/src/leap/keymanager/__init__.py
+++ b/src/leap/keymanager/__init__.py
@@ -18,19 +18,33 @@
Key Manager is a Nicknym agent for LEAP client.
"""
# let's do a little sanity check to see if we're using the wrong gnupg
+import fileinput
+import os
import sys
+import tempfile
+
+from leap.common import ca_bundle
+
+from ._version import get_versions
try:
from gnupg.gnupg import GPGUtilities
assert(GPGUtilities) # pyflakes happy
from gnupg import __version__ as _gnupg_version
+ if '-' in _gnupg_version:
+ # avoid Parsing it as LegacyVersion, get just
+ # the release numbers:
+ _gnupg_version = _gnupg_version.split('-')[0]
from pkg_resources import parse_version
+ # We need to make sure that we're not colliding with the infamous
+ # python-gnupg
assert(parse_version(_gnupg_version) >= parse_version('1.4.0'))
except (ImportError, AssertionError):
print "*******"
print "Ooops! It looks like there is a conflict in the installed version "
print "of gnupg."
+ print "GNUPG_VERSION:", _gnupg_version
print
print "Disclaimer: Ideally, we would need to work a patch and propose the "
print "merge to upstream. But until then do: "
@@ -47,7 +61,7 @@ from twisted.internet import defer
from urlparse import urlparse
from leap.common.check import leap_assert
-from leap.common.events import emit, catalog
+from leap.common.events import emit_async, catalog
from leap.common.decorators import memoized_method
from leap.keymanager.errors import (
@@ -57,17 +71,17 @@ from leap.keymanager.errors import (
UnsupportedKeyTypeError,
InvalidSignature
)
-from leap.keymanager.validation import ValidationLevel, can_upgrade
+from leap.keymanager.validation import ValidationLevels, can_upgrade
from leap.keymanager.keys import (
build_key_from_dict,
KEYMANAGER_KEY_TAG,
TAGS_PRIVATE_INDEX,
)
-from leap.keymanager.openpgp import (
- OpenPGPKey,
- OpenPGPScheme,
-)
+from leap.keymanager.openpgp import OpenPGPKey, OpenPGPScheme
+
+__version__ = get_versions()['version']
+del get_versions
logger = logging.getLogger(__name__)
@@ -126,12 +140,43 @@ class KeyManager(object):
}
# the following are used to perform https requests
self._fetcher = requests
- self._session = self._fetcher.session()
+ self._combined_ca_bundle = self._create_combined_bundle_file()
+
+ #
+ # destructor
+ #
+
+ def __del__(self):
+ try:
+ created_tmp_combined_ca_bundle = self._combined_ca_bundle not in \
+ [ca_bundle.where(), self._ca_cert_path]
+ if created_tmp_combined_ca_bundle:
+ os.remove(self._combined_ca_bundle)
+ except OSError:
+ pass
#
# utilities
#
+ def _create_combined_bundle_file(self):
+ leap_ca_bundle = ca_bundle.where()
+
+ if self._ca_cert_path == leap_ca_bundle:
+ return self._ca_cert_path # don't merge file with itself
+ elif not self._ca_cert_path:
+ return leap_ca_bundle
+
+ tmp_file = tempfile.NamedTemporaryFile(delete=False)
+
+ with open(tmp_file.name, 'w') as fout:
+ fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path))
+ for line in fin:
+ fout.write(line)
+ fin.close()
+
+ return tmp_file.name
+
def _key_class_from_type(self, ktype):
"""
Return key class from string representation of key type.
@@ -168,6 +213,24 @@ class KeyManager(object):
# 'Content-type is not JSON.')
return res
+ def _get_with_combined_ca_bundle(self, uri, data=None):
+ """
+ Send a GET request to C{uri} containing C{data}.
+
+ Instead of using the ca_cert provided on construction time, this
+ version also uses the default certificates shipped with leap.common
+
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
+
+ :return: The response to the request.
+ :rtype: requests.Response
+ """
+ return self._fetcher.get(
+ uri, data=data, verify=self._combined_ca_bundle)
+
def _put(self, uri, data=None):
"""
Send a PUT request to C{uri} containing C{data}.
@@ -224,10 +287,10 @@ class KeyManager(object):
if self.OPENPGP_KEY in server_keys:
# nicknym server is authoritative for its own domain,
# for other domains the key might come from key servers.
- validation_level = ValidationLevel.Weak_Chain
+ validation_level = ValidationLevels.Weak_Chain
_, domain = _split_email(address)
if (domain == _get_domain(self._nickserver_uri)):
- validation_level = ValidationLevel.Provider_Trust
+ validation_level = ValidationLevels.Provider_Trust
d = self.put_raw_key(
server_keys['openpgp'],
@@ -277,7 +340,7 @@ class KeyManager(object):
self._api_version,
self._uid)
self._put(uri, data)
- emit(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
+ emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, self._address)
d = self.get_key(
self._address, ktype, private=False, fetch_remote=False)
@@ -313,33 +376,34 @@ class KeyManager(object):
leap_assert(
ktype in self._wrapper_map,
'Unkown key type: %s.' % str(ktype))
- emit(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ _keys = self._wrapper_map[ktype]
+
+ emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
def key_found(key):
- emit(catalog.KEYMANAGER_KEY_FOUND, address)
+ emit_async(catalog.KEYMANAGER_KEY_FOUND, address)
return key
def key_not_found(failure):
if not failure.check(KeyNotFound):
return failure
- emit(catalog.KEYMANAGER_KEY_NOT_FOUND, address)
+ emit_async(catalog.KEYMANAGER_KEY_NOT_FOUND, address)
# we will only try to fetch a key from nickserver if fetch_remote
# is True and the key is not private.
if fetch_remote is False or private is True:
return failure
- emit(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
d = self._fetch_keys_from_server(address)
d.addCallback(
- lambda _:
- self._wrapper_map[ktype].get_key(address, private=False))
+ lambda _: _keys.get_key(address, private=False))
d.addCallback(key_found)
return d
# return key if it exists in local database
- d = self._wrapper_map[ktype].get_key(address, private=private)
+ d = _keys.get_key(address, private=private)
d.addCallbacks(key_found, key_not_found)
return d
@@ -385,13 +449,16 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
def signal_finished(key):
- emit(catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
+ emit_async(
+ catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address)
return key
- emit(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
- d = self._wrapper_map[ktype].gen_key(self._address)
+ emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
+
+ d = _keys.gen_key(self._address)
d.addCallback(signal_finished)
return d
@@ -481,14 +548,15 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
def encrypt(keys):
pubkey, signkey = keys
- encrypted = self._wrapper_map[ktype].encrypt(
+ encrypted = _keys.encrypt(
data, pubkey, passphrase, sign=signkey,
cipher_algo=cipher_algo)
pubkey.encr_used = True
- d = self._wrapper_map[ktype].put_key(pubkey, address)
+ d = _keys.put_key(pubkey, address)
d.addCallback(lambda _: encrypted)
return d
@@ -533,18 +601,21 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
def decrypt(keys):
pubkey, privkey = keys
- decrypted, signed = self._wrapper_map[ktype].decrypt(
+ decrypted, signed = _keys.decrypt(
data, privkey, passphrase=passphrase, verify=pubkey)
if pubkey is None:
signature = KeyNotFound(verify)
elif signed:
- pubkey.sign_used = True
- d = self._wrapper_map[ktype].put_key(pubkey, address)
- d.addCallback(lambda _: (decrypted, pubkey))
- return d
+ signature = pubkey
+ if not pubkey.sign_used:
+ pubkey.sign_used = True
+ d = _keys.put_key(pubkey, verify)
+ d.addCallback(lambda _: (decrypted, signature))
+ return d
else:
signature = InvalidSignature(
'Failed to verify signature with key %s' %
@@ -593,9 +664,10 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
def sign(privkey):
- return self._wrapper_map[ktype].sign(
+ return _keys.sign(
data, privkey, digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
@@ -631,15 +703,18 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
def verify(pubkey):
- signed = self._wrapper_map[ktype].verify(
+ signed = _keys.verify(
data, pubkey, detached_sig=detached_sig)
if signed:
- pubkey.sign_used = True
- d = self._wrapper_map[ktype].put_key(pubkey, address)
- d.addCallback(lambda _: pubkey)
- return d
+ if not pubkey.sign_used:
+ pubkey.sign_used = True
+ d = _keys.put_key(pubkey, address)
+ d.addCallback(lambda _: pubkey)
+ return d
+ return pubkey
else:
raise InvalidSignature(
'Failed to verify signature with key %s' %
@@ -664,7 +739,8 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(type(key))
- return self._wrapper_map[type(key)].delete_key(key)
+ _keys = self._wrapper_map[type(key)]
+ return _keys.delete_key(key)
def put_key(self, key, address):
"""
@@ -684,7 +760,9 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
- self._assert_supported_key_type(type(key))
+ ktype = type(key)
+ self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
if address not in key.address:
return defer.fail(
@@ -699,20 +777,19 @@ class KeyManager(object):
def check_upgrade(old_key):
if key.private or can_upgrade(key, old_key):
- return self._wrapper_map[type(key)].put_key(key, address)
+ return _keys.put_key(key, address)
else:
raise KeyNotValidUpgrade(
"Key %s can not be upgraded by new key %s"
% (old_key.key_id, key.key_id))
- d = self._wrapper_map[type(key)].get_key(address,
- private=key.private)
+ d = _keys.get_key(address, private=key.private)
d.addErrback(old_key_not_found)
d.addCallback(check_upgrade)
return d
def put_raw_key(self, key, ktype, address,
- validation=ValidationLevel.Weak_Chain):
+ validation=ValidationLevels.Weak_Chain):
"""
Put raw key bound to address in local storage.
@@ -724,7 +801,7 @@ class KeyManager(object):
:type address: str
:param validation: validation level for this key
(default: 'Weak_Chain')
- :type validation: ValidationLevel
+ :type validation: ValidationLevels
:return: A Deferred which fires when the key is in the storage, or
which fails with KeyAddressMismatch if address doesn't match
@@ -736,7 +813,9 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
- pubkey, privkey = self._wrapper_map[ktype].parse_ascii_key(key)
+ _keys = self._wrapper_map[ktype]
+
+ pubkey, privkey = _keys.parse_ascii_key(key)
pubkey.validation = validation
d = self.put_key(pubkey, address)
if privkey is not None:
@@ -744,7 +823,7 @@ class KeyManager(object):
return d
def fetch_key(self, address, uri, ktype,
- validation=ValidationLevel.Weak_Chain):
+ validation=ValidationLevels.Weak_Chain):
"""
Fetch a public key bound to address from the network and put it in
local storage.
@@ -757,7 +836,7 @@ class KeyManager(object):
:type ktype: subclass of EncryptionKey
:param validation: validation level for this key
(default: 'Weak_Chain')
- :type validation: ValidationLevel
+ :type validation: ValidationLevels
:return: A Deferred which fires when the key is in the storage, or
which fails with KeyNotFound: if not valid key on uri or fails
@@ -769,13 +848,19 @@ class KeyManager(object):
:raise UnsupportedKeyTypeError: if invalid key type
"""
self._assert_supported_key_type(ktype)
+ _keys = self._wrapper_map[ktype]
- res = self._get(uri)
+ logger.info("Fetch key for %s from %s" % (address, uri))
+ try:
+ res = self._get_with_combined_ca_bundle(uri)
+ except Exception as e:
+ logger.warning("There was a problem fetching key: %s" % (e,))
+ return defer.fail(KeyNotFound(uri))
if not res.ok:
return defer.fail(KeyNotFound(uri))
# XXX parse binary keys
- pubkey, _ = self._wrapper_map[ktype].parse_ascii_key(res.content)
+ pubkey, _ = _keys.parse_ascii_key(res.content)
if pubkey is None:
return defer.fail(KeyNotFound(uri))
@@ -821,8 +906,3 @@ def _get_domain(url):
:rtype: str
"""
return urlparse(url).hostname
-
-
-from ._version import get_versions
-__version__ = get_versions()['version']
-del get_versions
diff --git a/src/leap/keymanager/_version.py b/src/leap/keymanager/_version.py
index ffc869a..049099d 100644
--- a/src/leap/keymanager/_version.py
+++ b/src/leap/keymanager/_version.py
@@ -5,8 +5,8 @@
# unpacked source archive. Distribution tarballs contain a pre-generated copy
# of this file.
-version_version = '0.4.0'
-version_full = '11d48a1912cc3d925c9228b021870ee59fe73e0b'
+version_version = '0.4.3'
+version_full = 'f17f8c43e217b65a951e5db357699c2d2af6098f'
def get_versions(default={}, verbose=False):
diff --git a/src/leap/keymanager/keys.py b/src/leap/keymanager/keys.py
index 562c0a9..91559c2 100644
--- a/src/leap/keymanager/keys.py
+++ b/src/leap/keymanager/keys.py
@@ -35,7 +35,7 @@ from datetime import datetime
from leap.common.check import leap_assert
from twisted.internet import defer
-from leap.keymanager.validation import ValidationLevel, toValidationLevel
+from leap.keymanager.validation import ValidationLevels
logger = logging.getLogger(__name__)
@@ -120,11 +120,11 @@ def build_key_from_dict(kClass, kdict):
:rtype: C{kClass}
"""
try:
- validation = toValidationLevel(kdict[KEY_VALIDATION_KEY])
+ validation = ValidationLevels.get(kdict[KEY_VALIDATION_KEY])
except ValueError:
logger.error("Not valid validation level (%s) for key %s",
(kdict[KEY_VALIDATION_KEY], kdict[KEY_ID_KEY]))
- validation = ValidationLevel.Weak_Chain
+ validation = ValidationLevels.Weak_Chain
expiry_date = _to_datetime(kdict[KEY_EXPIRY_DATE_KEY])
last_audited_at = _to_datetime(kdict[KEY_LAST_AUDITED_AT_KEY])
@@ -176,7 +176,7 @@ class EncryptionKey(object):
def __init__(self, address, key_id="", fingerprint="",
key_data="", private=False, length=0, expiry_date=None,
- validation=ValidationLevel.Weak_Chain, last_audited_at=None,
+ validation=ValidationLevels.Weak_Chain, last_audited_at=None,
refreshed_at=None, encr_used=False, sign_used=False):
self.address = address
self.key_id = key_id
@@ -213,7 +213,7 @@ class EncryptionKey(object):
KEY_EXPIRY_DATE_KEY: expiry_date,
KEY_LAST_AUDITED_AT_KEY: last_audited_at,
KEY_REFRESHED_AT_KEY: refreshed_at,
- KEY_VALIDATION_KEY: self.validation.name,
+ KEY_VALIDATION_KEY: str(self.validation),
KEY_ENCR_USED_KEY: self.encr_used,
KEY_SIGN_USED_KEY: self.sign_used,
KEY_TAGS_KEY: [KEYMANAGER_KEY_TAG],
diff --git a/src/leap/keymanager/openpgp.py b/src/leap/keymanager/openpgp.py
index 794a0ec..d648137 100644
--- a/src/leap/keymanager/openpgp.py
+++ b/src/leap/keymanager/openpgp.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# openpgp.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013-2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -22,6 +22,7 @@ import os
import re
import shutil
import tempfile
+import traceback
import io
@@ -41,6 +42,8 @@ from leap.keymanager.keys import (
TYPE_ADDRESS_PRIVATE_INDEX,
KEY_ADDRESS_KEY,
KEY_ID_KEY,
+ KEY_FINGERPRINT_KEY,
+ KEY_REFRESHED_AT_KEY,
KEYMANAGER_ACTIVE_TYPE,
)
@@ -251,7 +254,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert(is_address(address), 'Not an user address: %s' % address)
def _gen_key(_):
- with self._temporary_gpgwrapper() as gpg:
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
# TODO: inspect result, or use decorator
params = gpg.gen_key_input(
key_type='RSA',
@@ -321,7 +324,9 @@ class OpenPGPScheme(EncryptionScheme):
raise errors.KeyNotFound(address)
leap_assert(
address in doc.content[KEY_ADDRESS_KEY],
- 'Wrong address in key data.')
+ 'Wrong address in key %s. Expected %s, found %s.'
+ % (doc.content[KEY_ID_KEY], address,
+ doc.content[KEY_ADDRESS_KEY]))
key = build_key_from_dict(OpenPGPKey, doc.content)
key._gpgbinary = self._gpgbinary
return key
@@ -346,37 +351,25 @@ class OpenPGPScheme(EncryptionScheme):
# TODO: add more checks for correct key data.
leap_assert(key_data is not None, 'Data does not represent a key.')
- with self._temporary_gpgwrapper() as gpg:
- # TODO: inspect result, or use decorator
- gpg.import_keys(key_data)
- privkey = None
- pubkey = None
+ priv_info, privkey = process_ascii_key(
+ key_data, self._gpgbinary, secret=True)
+ pub_info, pubkey = process_ascii_key(
+ key_data, self._gpgbinary, secret=False)
- try:
- privkey = gpg.list_keys(secret=True).pop()
- except IndexError:
- pass
- try:
- pubkey = gpg.list_keys(secret=False).pop() # unitary keyring
- except IndexError:
- return (None, None)
-
- openpgp_privkey = None
- if privkey is not None:
- # build private key
- openpgp_privkey = self._build_key_from_gpg(
- privkey,
- gpg.export_keys(privkey['fingerprint'], secret=True))
- leap_check(pubkey['fingerprint'] == privkey['fingerprint'],
- 'Fingerprints for public and private key differ.',
- errors.KeyFingerprintMismatch)
-
- # build public key
- openpgp_pubkey = self._build_key_from_gpg(
- pubkey,
- gpg.export_keys(pubkey['fingerprint'], secret=False))
-
- return (openpgp_pubkey, openpgp_privkey)
+ if not pubkey:
+ return (None, None)
+
+ openpgp_privkey = None
+ if privkey:
+ # build private key
+ openpgp_privkey = self._build_key_from_gpg(priv_info, privkey)
+ leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
+ 'Fingerprints for public and private key differ.',
+ errors.KeyFingerprintMismatch)
+ # build public key
+ openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey)
+
+ return (openpgp_pubkey, openpgp_privkey)
def put_ascii_key(self, key_data, address):
"""
@@ -432,41 +425,42 @@ class OpenPGPScheme(EncryptionScheme):
:rtype: Deferred
"""
def check_and_put(docs, key):
- if len(docs) == 1:
- doc = docs.pop()
- oldkey = build_key_from_dict(OpenPGPKey, doc.content)
- if key.fingerprint == oldkey.fingerprint:
- # in case of an update of the key merge them with gnupg
- with self._temporary_gpgwrapper() as gpg:
- gpg.import_keys(oldkey.key_data)
- gpg.import_keys(key.key_data)
- gpgkey = gpg.list_keys(secret=key.private).pop()
- mergedkey = self._build_key_from_gpg(
- gpgkey,
- gpg.export_keys(gpgkey['fingerprint'],
- secret=key.private))
- mergedkey.validation = max(
- [key.validation, oldkey.validation])
- mergedkey.last_audited_at = oldkey.last_audited_at
- mergedkey.refreshed_at = key.refreshed_at
- mergedkey.encr_used = key.encr_used or oldkey.encr_used
- mergedkey.sign_used = key.sign_used or oldkey.sign_used
- doc.set_json(mergedkey.get_json())
- d = self._soledad.put_doc(doc)
- else:
- logger.critical(
- "Can't put a key whith the same key_id and different "
- "fingerprint: %s, %s"
- % (key.fingerprint, oldkey.fingerprint))
- d = defer.fail(
- errors.KeyFingerprintMismatch(key.fingerprint))
+ deferred_repair = defer.succeed(None)
+ if len(docs) == 0:
+ return self._soledad.create_doc_from_json(key.get_json())
elif len(docs) > 1:
+ deferred_repair = self._repair_key_docs(docs, key.key_id)
+
+ doc = docs[0]
+ oldkey = build_key_from_dict(OpenPGPKey, doc.content)
+ if key.fingerprint != oldkey.fingerprint:
logger.critical(
- "There is more than one key with the same key_id %s"
- % (key.key_id,))
- d = defer.fail(errors.KeyAttributesDiffer(key.key_id))
- else:
- d = self._soledad.create_doc_from_json(key.get_json())
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (key.fingerprint, oldkey.fingerprint))
+ return defer.fail(
+ errors.KeyFingerprintMismatch(key.fingerprint))
+
+ # in case of an update of the key merge them with gnupg
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(oldkey.key_data)
+ gpg.import_keys(key.key_data)
+ gpgkey = gpg.list_keys(secret=key.private).pop()
+ mergedkey = self._build_key_from_gpg(
+ gpgkey,
+ gpg.export_keys(gpgkey['fingerprint'],
+ secret=key.private))
+ mergedkey.validation = max(
+ [key.validation, oldkey.validation])
+ mergedkey.last_audited_at = oldkey.last_audited_at
+ mergedkey.refreshed_at = key.refreshed_at
+ mergedkey.encr_used = key.encr_used or oldkey.encr_used
+ mergedkey.sign_used = key.sign_used or oldkey.sign_used
+ doc.set_json(mergedkey.get_json())
+ deferred_put = self._soledad.put_doc(doc)
+
+ d = defer.gatherResults([deferred_put, deferred_repair])
+ d.addCallback(lambda res: res[0])
return d
d = self._soledad.get_from_index(
@@ -543,14 +537,21 @@ class OpenPGPScheme(EncryptionScheme):
self.KEY_TYPE,
key_id,
'1' if private else '0')
- d.addCallback(get_doc, key_id)
+ d.addCallback(get_doc, key_id, activedoc)
return d
- def get_doc(doclist, key_id):
- leap_assert(
- len(doclist) is 1,
- 'There is %d keys for id %s!' % (len(doclist), key_id))
- return doclist.pop()
+ def get_doc(doclist, key_id, activedoc):
+ if len(doclist) == 0:
+ logger.warning('There is no key for id %s! Self-repairing it.'
+ % (key_id))
+ d = self._soledad.delete_doc(activedoc)
+ d.addCallback(lambda _: None)
+ return d
+ elif len(doclist) > 1:
+ d = self._repair_key_docs(doclist, key_id)
+ d.addCallback(lambda _: doclist[0])
+ return d
+ return doclist[0]
d = self._soledad.get_from_index(
TYPE_ADDRESS_PRIVATE_INDEX,
@@ -575,24 +576,7 @@ class OpenPGPScheme(EncryptionScheme):
:return: An instance of the key.
:rtype: OpenPGPKey
"""
- expiry_date = None
- if key['expires']:
- expiry_date = datetime.fromtimestamp(int(key['expires']))
- address = []
- for uid in key['uids']:
- address.append(_parse_address(uid))
-
- return OpenPGPKey(
- address,
- gpgbinary=self._gpgbinary,
- key_id=key['keyid'],
- fingerprint=key['fingerprint'],
- key_data=key_data,
- private=True if key['type'] == 'sec' else False,
- length=int(key['length']),
- expiry_date=expiry_date,
- refreshed_at=datetime.now(),
- )
+ return build_gpg_key(key, key_data, self._gpgbinary)
def delete_key(self, key):
"""
@@ -625,18 +609,20 @@ class OpenPGPScheme(EncryptionScheme):
def delete_key(docs):
if len(docs) == 0:
raise errors.KeyNotFound(key)
- if len(docs) > 1:
- logger.critical("There is more than one key for key_id %s"
- % key.key_id)
-
- doc = None
- for d in docs:
- if d.content['fingerprint'] == key.fingerprint:
- doc = d
- break
- if doc is None:
+ elif len(docs) > 1:
+ logger.warning("There is more than one key for key_id %s"
+ % key.key_id)
+
+ has_deleted = False
+ deferreds = []
+ for doc in docs:
+ if doc.content['fingerprint'] == key.fingerprint:
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ has_deleted = True
+ if not has_deleted:
raise errors.KeyNotFound(key)
- return self._soledad.delete_doc(doc)
+ return defer.gatherResults(deferreds)
d = self._soledad.get_from_index(
TYPE_ID_PRIVATE_INDEX,
@@ -648,24 +634,39 @@ class OpenPGPScheme(EncryptionScheme):
d.addCallback(delete_key)
return d
- #
- # Data encryption, decryption, signing and verifying
- #
+ def _repair_key_docs(self, doclist, key_id):
+ """
+ If there is more than one key for a key id try to self-repair it
- def _temporary_gpgwrapper(self, keys=None):
+ :return: a Deferred that will be fired once all the deletions are
+ completed
+ :rtype: Deferred
"""
- Return a gpg wrapper that implements the context manager protocol and
- contains C{keys}.
+ logger.error("BUG ---------------------------------------------------")
+ logger.error("There is more than one key with the same key_id %s:"
+ % (key_id,))
- :param keys: keys to conform the keyring.
- :type key: list(OpenPGPKey)
+ def log_key_doc(doc):
+ logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
+ doc.content[KEY_FINGERPRINT_KEY]))
- :return: a TempGPGWrapper instance
- :rtype: TempGPGWrapper
- """
- # TODO do here checks on key_data
- return TempGPGWrapper(
- keys=keys, gpgbinary=self._gpgbinary)
+ doclist.sort(key=lambda doc: doc.content[KEY_REFRESHED_AT_KEY],
+ reverse=True)
+ log_key_doc(doclist[0])
+ deferreds = []
+ for doc in doclist[1:]:
+ log_key_doc(doc)
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+
+ logger.error("")
+ logger.error(traceback.extract_stack())
+ logger.error("BUG (please report above info) ------------------------")
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ #
+ # Data encryption, decryption, signing and verifying
+ #
@staticmethod
def _assert_gpg_result_ok(result):
@@ -711,7 +712,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(sign, OpenPGPKey)
leap_assert(sign.private is True)
keys.append(sign)
- with self._temporary_gpgwrapper(keys) as gpg:
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
result = gpg.encrypt(
data, pubkey.fingerprint,
default_key=sign.key_id if sign else None,
@@ -753,7 +754,7 @@ class OpenPGPScheme(EncryptionScheme):
leap_assert_type(verify, OpenPGPKey)
leap_assert(verify.private is False)
keys.append(verify)
- with self._temporary_gpgwrapper(keys) as gpg:
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
try:
result = gpg.decrypt(
data, passphrase=passphrase, always_trust=True)
@@ -781,7 +782,7 @@ class OpenPGPScheme(EncryptionScheme):
:return: Whether C{data} was encrypted using this wrapper.
:rtype: bool
"""
- with self._temporary_gpgwrapper() as gpg:
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
gpgutil = GPGUtilities(gpg)
return gpgutil.is_encrypted_asym(data)
@@ -812,7 +813,7 @@ class OpenPGPScheme(EncryptionScheme):
# result.fingerprint - contains the fingerprint of the key used to
# sign.
- with self._temporary_gpgwrapper(privkey) as gpg:
+ with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
result = gpg.sign(data, default_key=privkey.key_id,
digest_algo=digest_algo, clearsign=clearsign,
detach=detach, binary=binary)
@@ -847,7 +848,7 @@ class OpenPGPScheme(EncryptionScheme):
"""
leap_assert_type(pubkey, OpenPGPKey)
leap_assert(pubkey.private is False)
- with self._temporary_gpgwrapper(pubkey) as gpg:
+ with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:
result = None
if detached_sig is None:
result = gpg.verify(data)
@@ -865,3 +866,35 @@ class OpenPGPScheme(EncryptionScheme):
rfprint = result.fingerprint
kfprint = gpgpubkey['fingerprint']
return valid and rfprint == kfprint
+
+
+def process_ascii_key(key_data, gpgbinary, secret=False):
+ with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
+ try:
+ gpg.import_keys(key_data)
+ info = gpg.list_keys(secret=secret).pop()
+ key = gpg.export_keys(info['fingerprint'], secret=secret)
+ except IndexError:
+ info = {}
+ key = None
+ return info, key
+
+
+def build_gpg_key(key_info, key_data, gpgbinary=None):
+ expiry_date = None
+ if key_info['expires']:
+ expiry_date = datetime.fromtimestamp(int(key_info['expires']))
+ address = []
+ for uid in key_info['uids']:
+ address.append(_parse_address(uid))
+
+ return OpenPGPKey(
+ address,
+ gpgbinary=gpgbinary,
+ key_id=key_info['keyid'],
+ fingerprint=key_info['fingerprint'],
+ key_data=key_data,
+ private=True if key_info['type'] == 'sec' else False,
+ length=int(key_info['length']),
+ expiry_date=expiry_date,
+ refreshed_at=datetime.now())
diff --git a/src/leap/keymanager/tests/__init__.py b/src/leap/keymanager/tests/__init__.py
index 7128d20..cd612c4 100644
--- a/src/leap/keymanager/tests/__init__.py
+++ b/src/leap/keymanager/tests/__init__.py
@@ -66,18 +66,27 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
for private in [True, False]:
d = km.get_all_keys(private=private)
d.addCallback(delete_keys)
+ d.addCallback(check_deleted, private)
deferreds.append(d)
return gatherResults(deferreds)
+ def check_deleted(_, private):
+ d = km.get_all_keys(private=private)
+ d.addCallback(lambda keys: self.assertEqual(keys, []))
+ return d
+
# wait for the indexes to be ready for the tear down
d = km._wrapper_map[OpenPGPKey].deferred_indexes
d.addCallback(get_and_delete_keys)
d.addCallback(lambda _: self.tearDownEnv())
+ d.addCallback(lambda _: self._soledad.close())
return d
- def _key_manager(self, user=ADDRESS, url='', token=None):
+ def _key_manager(self, user=ADDRESS, url='', token=None,
+ ca_cert_path=None):
return KeyManager(user, url, self._soledad, token=token,
- gpgbinary=self.gpg_binary_path)
+ gpgbinary=self.gpg_binary_path,
+ ca_cert_path=ca_cert_path)
def _find_gpg(self):
gpg_path = distutils.spawn.find_executable('gpg')
@@ -88,6 +97,7 @@ class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
# key 24D18DDF: public key "Leap Test Key <leap@leap.se>"
+KEY_ID = "2F455E2824D18DDF"
KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
PUBLIC_KEY = """
-----BEGIN PGP PUBLIC KEY BLOCK-----
diff --git a/src/leap/keymanager/tests/test_keymanager.py b/src/leap/keymanager/tests/test_keymanager.py
index 55f892e..856d6da 100644
--- a/src/leap/keymanager/tests/test_keymanager.py
+++ b/src/leap/keymanager/tests/test_keymanager.py
@@ -20,9 +20,11 @@
Tests for the Key Manager.
"""
-
+from os import path
from datetime import datetime
-from mock import Mock
+import tempfile
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
@@ -36,10 +38,7 @@ from leap.keymanager.keys import (
is_address,
build_key_from_dict,
)
-from leap.keymanager.validation import (
- ValidationLevel,
- toValidationLevel
-)
+from leap.keymanager.validation import ValidationLevels
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
ADDRESS,
@@ -53,6 +52,7 @@ from leap.keymanager.tests import (
NICKSERVER_URI = "http://leap.se/"
+REMOTE_KEY_URL = "http://site.domain/key"
class KeyManagerUtilTestCase(unittest.TestCase):
@@ -82,7 +82,7 @@ class KeyManagerUtilTestCase(unittest.TestCase):
'expiry_date': 0,
'last_audited_at': 0,
'refreshed_at': 1311239602,
- 'validation': ValidationLevel.Weak_Chain.name,
+ 'validation': str(ValidationLevels.Weak_Chain),
'encr_used': False,
'sign_used': True,
}
@@ -115,7 +115,7 @@ class KeyManagerUtilTestCase(unittest.TestCase):
datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
'Wrong data in key.')
self.assertEqual(
- toValidationLevel(kdict['validation']), key.validation,
+ ValidationLevels.get(kdict['validation']), key.validation,
'Wrong data in key.')
self.assertEqual(
kdict['encr_used'], key.encr_used,
@@ -227,7 +227,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS in key.address)
- self.assertEqual(key.validation, ValidationLevel.Provider_Trust)
+ self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
@inlineCallbacks
def test_get_key_fetches_other_domain(self):
@@ -239,7 +239,7 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
self.assertIsInstance(key, OpenPGPKey)
self.assertTrue(ADDRESS_OTHER in key.address)
- self.assertEqual(key.validation, ValidationLevel.Weak_Chain)
+ self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
def _fetch_key(self, km, address, key):
"""
@@ -290,7 +290,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
yield km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
key = yield km.get_key(ADDRESS, OpenPGPKey)
@@ -307,7 +306,6 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = ""
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyNotFound)
@@ -323,10 +321,125 @@ class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
content = PUBLIC_KEY
km._fetcher.get = Mock(return_value=Response())
- km.ca_cert_path = 'cacertpath'
d = km.fetch_key(ADDRESS_2, "http://site.domain/key", OpenPGPKey)
return self.assertFailure(d, KeyAddressMismatch)
+ def _mock_get_response(self, km, body):
+ class Response(object):
+ ok = True
+ content = body
+
+ mock = MagicMock(return_value=Response())
+ km._fetcher.get = mock
+
+ return mock
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+ ca_cert_path = None
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
+ ca_cert_path = ''
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
+ ca_cert_path = ca_bundle.where()
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=ca_bundle.where())
+
+ @inlineCallbacks
+ def test_fetch_uses_combined_ca_bundle_otherwise(self):
+ with tempfile.NamedTemporaryFile() as tmp_input, \
+ tempfile.NamedTemporaryFile(delete=False) as tmp_output:
+ ca_content = 'some\ncontent\n'
+ ca_cert_path = tmp_input.name
+ self._dump_to_file(ca_cert_path, ca_content)
+
+ with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+ mock.return_value = tmp_output
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL, OpenPGPKey)
+
+ # assert that combined bundle file is passed to get call
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, data=None,
+ verify=tmp_output.name)
+
+ # assert that files got appended
+ expected = self._slurp_file(ca_bundle.where()) + ca_content
+ self.assertEqual(expected, self._slurp_file(tmp_output.name))
+
+ del km # force km out of scope
+ self.assertFalse(path.exists(tmp_output.name))
+
+ def _dump_to_file(self, filename, content):
+ with open(filename, 'w') as out:
+ out.write(content)
+
+ def _slurp_file(self, filename):
+ with open(filename) as f:
+ content = f.read()
+ return content
+
+ @inlineCallbacks
+ def test_decrypt_updates_sign_used_for_signer(self):
+ # given
+ km = self._key_manager()
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+
+ # when
+ key = yield km.get_key(ADDRESS_2, OpenPGPKey, fetch_remote=False)
+
+ # then
+ self.assertEqual(True, key.sign_used)
+
+ @inlineCallbacks
+ def test_decrypt_does_not_update_sign_used_for_recipient(self):
+ # given
+ km = self._key_manager()
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY, ADDRESS)
+ yield km._wrapper_map[OpenPGPKey].put_ascii_key(
+ PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, OpenPGPKey,
+ sign=ADDRESS_2, fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, OpenPGPKey, verify=ADDRESS_2, fetch_remote=False)
+
+ # when
+ key = yield km.get_key(
+ ADDRESS, OpenPGPKey, private=False, fetch_remote=False)
+
+ # then
+ self.assertEqual(False, key.sign_used)
+
class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
@@ -391,9 +504,8 @@ class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
sign=ADDRESS, fetch_remote=False))
return self.assertFailure(d, KeyNotFound)
-
-import unittest
if __name__ == "__main__":
+ import unittest
unittest.main()
# key 0F91B402: someone@somedomain.org
diff --git a/src/leap/keymanager/tests/test_openpgp.py b/src/leap/keymanager/tests/test_openpgp.py
index 5f85c74..bae83db 100644
--- a/src/leap/keymanager/tests/test_openpgp.py
+++ b/src/leap/keymanager/tests/test_openpgp.py
@@ -21,12 +21,15 @@ Tests for the OpenPGP support on Key Manager.
"""
-from twisted.internet.defer import inlineCallbacks
+from datetime import datetime
+from mock import Mock
+from twisted.internet.defer import inlineCallbacks, gatherResults, succeed
from leap.keymanager import (
KeyNotFound,
openpgp,
)
+from leap.keymanager.keys import TYPE_ID_PRIVATE_INDEX
from leap.keymanager.openpgp import OpenPGPKey
from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
@@ -34,6 +37,7 @@ from leap.keymanager.tests import (
ADDRESS_2,
KEY_FINGERPRINT,
PUBLIC_KEY,
+ KEY_ID,
PUBLIC_KEY_2,
PRIVATE_KEY,
PRIVATE_KEY_2,
@@ -247,6 +251,104 @@ class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
validsign = pgp.verify(data, pubkey, detached_sig=signature)
self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_self_repair_three_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ key = yield pgp.get_key(ADDRESS, private=False)
+
+ try:
+ self.assertEqual(key.key_id, "2")
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_no_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ return succeed([])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
+ KeyNotFound)
+ self.assertEqual(self._soledad.delete_doc.call_count, 1)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
+ @inlineCallbacks
+ def test_self_repair_put_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+
+ def my_get_from_index(*args):
+ if (args[0] == TYPE_ID_PRIVATE_INDEX and
+ args[2] == KEY_ID):
+ k1 = OpenPGPKey(ADDRESS, key_id="1",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, key_id="2",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, key_id="3",
+ fingerprint=KEY_FINGERPRINT,
+ refreshed_at=datetime(2001, 1, 1))
+ d1 = self._soledad.create_doc_from_json(k1.get_json())
+ d2 = self._soledad.create_doc_from_json(k2.get_json())
+ d3 = self._soledad.create_doc_from_json(k3.get_json())
+ return gatherResults([d1, d2, d3])
+ return get_from_index(*args)
+
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+
+ try:
+ yield pgp.put_ascii_key(PUBLIC_KEY, ADDRESS)
+ self.assertEqual(self._soledad.delete_doc.call_count, 2)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+
def _assert_key_not_found(self, pgp, address, private=False):
d = pgp.get_key(address, private=private)
return self.assertFailure(d, KeyNotFound)
diff --git a/src/leap/keymanager/tests/test_validation.py b/src/leap/keymanager/tests/test_validation.py
index 15e7d27..bcf41c4 100644
--- a/src/leap/keymanager/tests/test_validation.py
+++ b/src/leap/keymanager/tests/test_validation.py
@@ -18,6 +18,7 @@
Tests for the Validation Levels
"""
+import unittest
from datetime import datetime
from twisted.internet.defer import inlineCallbacks
@@ -29,12 +30,15 @@ from leap.keymanager.tests import (
KeyManagerWithSoledadTestCase,
ADDRESS,
PUBLIC_KEY,
+ ADDRESS_2,
+ PUBLIC_KEY_2,
+ PRIVATE_KEY_2,
KEY_FINGERPRINT
)
-from leap.keymanager.validation import ValidationLevel
+from leap.keymanager.validation import ValidationLevels
-class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
+class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
@inlineCallbacks
def test_none_old_key(self):
@@ -47,7 +51,7 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_cant_upgrade(self):
km = self._key_manager()
yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
d = km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS)
yield self.assertFailure(d, KeyNotValidUpgrade)
@@ -56,7 +60,7 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
yield km.put_raw_key(PUBLIC_KEY, OpenPGPKey, ADDRESS)
yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Fingerprint)
+ validation=ValidationLevels.Fingerprint)
key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@@ -73,12 +77,12 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
km = self._key_manager()
yield km.put_raw_key(
EXPIRED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Third_Party_Endorsement)
+ validation=ValidationLevels.Third_Party_Endorsement)
d = km.put_raw_key(
UNRELATED_KEY,
OpenPGPKey,
ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
@@ -93,14 +97,14 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
def test_not_used(self):
km = self._key_manager()
yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Trust)
+ validation=ValidationLevels.Provider_Trust)
yield km.put_raw_key(UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Endorsement)
+ validation=ValidationLevels.Provider_Endorsement)
key = yield km.get_key(ADDRESS, OpenPGPKey, fetch_remote=False)
self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
@inlineCallbacks
- def test_used(self):
+ def test_used_with_verify(self):
TEXT = "some text"
km = self._key_manager()
@@ -114,7 +118,28 @@ class ValidationLevelTestCase(KeyManagerWithSoledadTestCase):
yield km.verify(TEXT, ADDRESS, OpenPGPKey, detached_sig=signature)
d = km.put_raw_key(
UNRELATED_KEY, OpenPGPKey, ADDRESS,
- validation=ValidationLevel.Provider_Endorsement)
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+
+ @inlineCallbacks
+ def test_used_with_decrypt(self):
+ TEXT = "some text"
+
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_KEY, OpenPGPKey, ADDRESS)
+ yield km.put_raw_key(PRIVATE_KEY_2, OpenPGPKey, ADDRESS_2)
+ yield km.encrypt(TEXT, ADDRESS, OpenPGPKey)
+
+ km2 = self._key_manager()
+ yield km2.put_raw_key(UNEXPIRED_PRIVATE, OpenPGPKey, ADDRESS)
+ yield km2.put_raw_key(PUBLIC_KEY_2, OpenPGPKey, ADDRESS_2)
+ encrypted = yield km2.encrypt(TEXT, ADDRESS_2, OpenPGPKey,
+ sign=ADDRESS)
+
+ yield km.decrypt(encrypted, ADDRESS_2, OpenPGPKey, verify=ADDRESS)
+ d = km.put_raw_key(
+ UNRELATED_KEY, OpenPGPKey, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
yield self.assertFailure(d, KeyNotValidUpgrade)
@inlineCallbacks
@@ -339,7 +364,6 @@ X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn
-----END PGP PUBLIC KEY BLOCK-----
"""
-
-import unittest
if __name__ == "__main__":
+ import unittest
unittest.main()
diff --git a/src/leap/keymanager/validation.py b/src/leap/keymanager/validation.py
index dfe6432..734cfce 100644
--- a/src/leap/keymanager/validation.py
+++ b/src/leap/keymanager/validation.py
@@ -24,34 +24,63 @@ See:
from datetime import datetime
-from enum import IntEnum
-ValidationLevel = IntEnum("ValidationLevel",
- "Weak_Chain "
- "Provider_Trust "
- "Provider_Endorsement "
- "Third_Party_Endorsement "
- "Third_Party_Consensus "
- "Historically_Auditing "
- "Known_Key "
- "Fingerprint")
+class ValidationLevel(object):
+ """
+ A validation level
+
+ Meant to be used to compare levels or get its string representation.
+ """
+ def __init__(self, name, value):
+ self.name = name
+ self.value = value
+
+ def __cmp__(self, other):
+ return cmp(self.value, other.value)
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return "<ValidationLevel: %s (%d)>" % (self.name, self.value)
-def toValidationLevel(value):
+class _ValidationLevels(object):
"""
- Convert a string representation of a validation level into
- C{ValidationLevel}
+ Handler class to manage validation levels. It should have only one global
+ instance 'ValidationLevels'.
- :param value: validation level
- :type value: str
- :rtype: ValidationLevel
- :raises ValueError: if C{value} is not a validation level
+ The levels are attributes of the instance and can be used like:
+ ValidationLevels.Weak_Chain
+ ValidationLevels.get("Weak_Chain")
"""
- for level in ValidationLevel:
- if value == level.name:
- return level
- raise ValueError("Not valid validation level: %s" % (value,))
+ _level_names = ("Weak_Chain",
+ "Provider_Trust",
+ "Provider_Endorsement",
+ "Third_Party_Endorsement",
+ "Third_Party_Consensus",
+ "Historically_Auditing",
+ "Known_Key",
+ "Fingerprint")
+
+ def __init__(self):
+ for name in self._level_names:
+ setattr(self, name,
+ ValidationLevel(name, self._level_names.index(name)))
+
+ def get(self, name):
+ """
+ Get the ValidationLevel of a name
+
+ :param name: name of the level
+ :type name: str
+ :rtype: ValidationLevel
+ """
+ return getattr(self, name)
+
+
+ValidationLevels = _ValidationLevels()
def can_upgrade(new_key, old_key):
@@ -69,7 +98,7 @@ def can_upgrade(new_key, old_key):
return True
# Manually verified fingerprint
- if new_key.validation == ValidationLevel.Fingerprint:
+ if new_key.validation == ValidationLevels.Fingerprint:
return True
# Expired key and higher validation level