diff --git a/keymanager/.gitattributes b/keymanager/.gitattributes
new file mode 100644
index 00000000..9cde8271
--- /dev/null
+++ b/keymanager/.gitattributes
@@ -0,0 +1 @@
+src/leap/keymanager/ export-subst
diff --git a/keymanager/.gitignore b/keymanager/.gitignore
new file mode 100644
index 00000000..18bfcd69
--- /dev/null
+++ b/keymanager/.gitignore
@@ -0,0 +1,35 @@
diff --git a/keymanager/.gitlab-ci.yml b/keymanager/.gitlab-ci.yml
new file mode 100644
index 00000000..bc953872
--- /dev/null
+++ b/keymanager/.gitlab-ci.yml
@@ -0,0 +1,2 @@
+ script: tox
diff --git a/keymanager/AUTHORS b/keymanager/AUTHORS
new file mode 100644
index 00000000..5e642489
--- /dev/null
+++ b/keymanager/AUTHORS
@@ -0,0 +1,6 @@
+Tomás Touceda <>
+Ruben Pollan <>
+Ivan Alejandro <>
+drebs <>
+Kali Kaneko <>
+Parménides GV <>
diff --git a/keymanager/CHANGELOG.rst b/keymanager/CHANGELOG.rst
new file mode 100644
index 00000000..3716025e
--- /dev/null
+++ b/keymanager/CHANGELOG.rst
@@ -0,0 +1,29 @@
+0.5.1 - 04 Apr, 2016
+- Create an iterator on EncryptionKey to be able to represent it as a dict
+0.5.0 - 18 Apr, 2016
+- `#7485 <>`_: Move validation, usage and audited date to the active document.
+- `#7713 <>`_: Update soledad documents by adding versioning field.
+- `#7500 <>`_: Use fingerprints instead of key ids.
+- `#7712 <>`_: Document the soledad docs fields.
+- Make EncryptionKey aware of the active address.
+- Defer encrypt, decrypt and gen_key operations from gnupg to external threads,
+ limited by cpu core amount.
+- `#7974 <>`_: Return KeyNotFound Failure if not valid key is given to put_raw_key.
+- This version includes changes in the Soledad Documents and minor modifications to the API.
+- Changelog migrated to rst.
diff --git a/keymanager/HISTORY.rst b/keymanager/HISTORY.rst
new file mode 100644
index 00000000..0a9221dc
--- /dev/null
+++ b/keymanager/HISTORY.rst
@@ -0,0 +1,95 @@
+0.4.3 Oct 28, 2015:
+ o self-repair the keyring if keys get duplicated. Closes: #7498
+ o catch request exceptions on key fetching. Closes #7410.
+ o Don't repush a public key with different addres
+ o use async events api. Closes #7224.
+ o Use ca_bundle when fetching keys by url.
+ o add logging to fetch_key. Related: #7410.
+ o more verbosity in get_key wrong address log.
+ o don't repush a public key with different address. Related #7420.
+0.4.2 Aug 26, 2015:
+ o Style changes.
+ o Tests updates.
+ o Packaging improvements.
+0.4.1 Jul 10, 2015:
+ o Remove the dependency on enum34. Closes: #7188.
+0.4.0 Jun 8, 2015:
+ o Adapt to new events api on leap.common. Related to #5359.
+ o Add 'fetch_key' method to fetch keys from a URI. Closes #5932.
+ o Clean up API.
+ o Fix call to python-gnupg's verify_file() method. Closes #6022.
+ o KeyManager.put_key now accepts also ascii keys.
+ o Multi uid support. Closes #6212.
+ o Port keymanager to the new soledad async API. Closes #6368.
+ o Return always KeyNotFound failure if fetch keys fails on an unknown error.
+ o Upgrade keys if not successfully used and strict high validation level.
+ Closes #6211.
+ o Use addresses instead of keys for encrypt, decrypt, sign & verify.
+ Closes #6346.
+ o Expose info about the signing key. Closes #6366.
+ o Fetched keys from other domain than its provider are set as 'Weak Chain'
+ validation level. Closes #6815.
+ o Keep old key after upgrade. Closes #6262.
+ o New soledad doc struct for encryption-keys. Closes #6299.
+ o Upgrade key when signed by old key. Closes #6240.
+0.3.8 Apr 4, 2014:
+ o Properly raise KeyNotFound exception when looking for keys on
+ nickserver. Fixes #5415.
+ o Do not decode decrypted data, return as str.
+ o Use a better version handler for the gnupg version check.
+ o Memoize call to get_key. Closes #4784.
+ o Update auth to interact with webapp v2. Fixes #5120.
+0.3.7 Dec 6, 2013:
+ o Fix error return values on openpgp backend.
+ o Remove address check when sending email and rely in the email
+ client to verify that is correct. Closes #4491.
+ o Support sending encrypted mails to addresses using the '+' sign.
+ o Improve exception names and handling.
+0.3.6 Nov 15, 2013:
+ o Default encoding to 'utf-8' in case of system encodings not
+ set. Closes #4427.
+ o Add verification of detached signatures. Closes #4375.
+ o New openpgp method: parse_ascii_keys.
+ o Expose openpgp methods in keymanager (parse_ascii_keys, put_key,
+ delete_key).
+0.3.5 Nov 1, 2013:
+ o Return unicode decrypted text to avoid encoding issues. Related to
+ #4000.
+0.3.4 Oct 18, 2013:
+ o Add option to choose cipher and digest algorithms when signing and
+ encrypting. Closes #4030.
+0.3.3 Oct 4, 2013:
+ o Add a sanity check for the correct version of gnupg.
+ o Update code to use gnupg 1.2.2 python module. Closes #2342.
+0.3.2 Sep 6, 2013:
+ o Do not raise exception when a GET request doesn't return 2XX
+ code. Nickserver uses error codes for more verbosity in the
+ result.
+ o Accept unicode ascii keys along with str.
+0.3.1 Aug 23, 2013:
+ o Signal different key related events, like key generation, key
+ upload.
+ o Update to new soledad package scheme (common, client and
+ server). Closes #3487.
+ o Packaging improvements: add versioneer and parse_requirements.
+0.3.0 Aug 9, 2013:
+ o If a nickserver request fails in any way, notify and continue.
+ o Options parameter in gnupg.GPG isn't supported by all versions, so
+ removing it for the time being.
+ o Add support for bundled gpg. Closes #3397.
+ o Refactor API to include encrypt/decrypt/sign/verify in KeyManager.
+0.2.0 Jul 12, 2013:
+ o Move keymanager to its own package
diff --git a/keymanager/LICENSE b/keymanager/LICENSE
new file mode 100644
index 00000000..bc08fe2e
--- /dev/null
+++ b/keymanager/LICENSE
diff --git a/keymanager/ b/keymanager/
new file mode 100644
index 00000000..8a66511d
--- /dev/null
+++ b/keymanager/
@@ -0,0 +1,5 @@
+include pkg/*
+include LICENSE
+include CHANGELOG.rst
+include src/leap/keymanager/
diff --git a/keymanager/README.rst b/keymanager/README.rst
new file mode 100644
index 00000000..1d4b53ea
--- /dev/null
+++ b/keymanager/README.rst
@@ -0,0 +1,29 @@
+LEAP's Key Manager
+.. image::
+ :target:
+.. image::
+ :target:
+The Key Manager is a `Nicknym`_ agent for the `LEAP`_ project, written in python using the `twisted`_ async framework.
+.. _`Nicknym`:
+.. _`LEAP`:
+.. _`twisted`:
+running tests
+Use trial to run the test suite::
+ trial leap.keymanager
+.. image::
+leap.keymanager is released under the terms of the `GNU GPL version 3`_ or later.
+.. _`GNU GPL version 3`:
diff --git a/keymanager/changes/VERSION_COMPAT b/keymanager/changes/VERSION_COMPAT
new file mode 100644
index 00000000..cc00ecf7
--- /dev/null
+++ b/keymanager/changes/VERSION_COMPAT
@@ -0,0 +1,10 @@
+# This file keeps track of the recent changes
+# introduced in internal leap dependencies.
+# Add your changes here so we can properly update
+# requirements.pip during the release process.
+# (leave header when resetting)
+# BEGIN DEPENDENCY LIST -------------------------
diff --git a/keymanager/changes/next-changelog.txt b/keymanager/changes/next-changelog.txt
new file mode 100644
index 00000000..d34022f8
--- /dev/null
+++ b/keymanager/changes/next-changelog.txt
@@ -0,0 +1,34 @@
+0.5.2 - xxx
+Please add lines to this file, they will be moved to the CHANGELOG.rst during
+the next release.
+There are two template lines for each category, use them as reference.
+I've added a new category `Misc` so we can track doc/style/packaging stuff.
+- `#8031 <>`_: Remove support for multiple key types.
+- `#8068 <>`_: make get_all_keys aware of active addresses.
+- `#6658 <>`_: Improve duplicated active documents fixup.
+- `#8165 <>`_: Check key document versions and fail if it's unknown.
+- Make ValidationLevels iterable
+- `#1234 <>`_: Description of the new feature corresponding with issue #1234.
+- New feature without related issue number.
+- `#1235 <>`_: Description for the fixed stuff corresponding with issue #1235.
+- Bugfix without related issue number.
+- `#1236 <>`_: Description of the new feature corresponding with issue #1236.
+- Some change without issue number.
+Known Issues
+- `#1236 <>`_: Description of the known issue corresponding with issue #1236.
diff --git a/keymanager/docs/leap-commit-template b/keymanager/docs/leap-commit-template
new file mode 100644
index 00000000..8a5c7cd0
--- /dev/null
+++ b/keymanager/docs/leap-commit-template
@@ -0,0 +1,7 @@
+[bug|feat|docs|style|refactor|test|pkg|i18n] ...
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
diff --git a/keymanager/docs/leap-commit-template.README b/keymanager/docs/leap-commit-template.README
new file mode 100644
index 00000000..ce8809e7
--- /dev/null
+++ b/keymanager/docs/leap-commit-template.README
@@ -0,0 +1,47 @@
+Run `git config commit.template docs/leap-commit-template` or
+edit the .git/config for this project and add
+`template = docs/leap-commit-template`
+under the [commit] block
+[type] <subject>
+Type should be one of the following:
+- bug (bug fix)
+- feat (new feature)
+- docs (changes to documentation)
+- style (formatting, pep8 violations, etc; no code change)
+- refactor (refactoring production code)
+- test (adding missing tests, refactoring tests; no production code change)
+- pkg (packaging related changes; no production code change)
+- i18n translation related changes
+Subject should use imperative tone and say what you did.
+For example, use 'change', NOT 'changed' or 'changes'.
+The body should go into detail about changes made.
+The footer should contain any issue references or actions.
+You can use one or several of the following:
+- Resolves: #XYZ
+- Related: #XYZ
+- Documentation: #XYZ
+- Releases: XYZ
+The Documentation field should be included in every new feature commit, and it
+should link to an issue in the bug tracker where the new feature is analyzed
+and documented.
+For a full example of how to write a good commit message, check out
diff --git a/keymanager/docs/soledad-documents.rst b/keymanager/docs/soledad-documents.rst
new file mode 100644
index 00000000..67055b2f
--- /dev/null
+++ b/keymanager/docs/soledad-documents.rst
@@ -0,0 +1,77 @@
+Soledad Documents
+KeyManager uses two types of documents for the keyring:
+* key document, that stores each gpg key.
+* active document, that relates an address to its corresponding key.
+Each key can have 0 or more active documents with a different email address
+ .-------------. .-------------.
+ | | | |
+ '-------------' '-------------'
+ | |
+ | .-----------. |
+ | | | |
+ | | key | |
+ '----->| |<----'
+ | |
+ '-----------'
+Fields in a key document:
+* uids
+* fingerprint
+* key_data
+* private. bool marking if the key is private or public
+* length
+* expiry_date
+* refreshed_at
+* version = 1
+* type = "OpenPGPKey"
+* tags = ["keymanager-key"]
+Fields in an active document:
+* address
+* fingerprint
+* private
+* validation
+* last_audited_at
+* encr_used
+* sign_used
+* version = 1
+* type = "OpenPGPKey-active"
+* tags = ["keymanager-active"]
+The meaning of validation, encr_used and sign_used is related to the `Transitional Key Validation`_
+.. _Transitional Key Validation:
diff --git a/keymanager/pkg/ b/keymanager/pkg/
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/keymanager/pkg/
diff --git a/keymanager/pkg/ b/keymanager/pkg/
new file mode 100755
index 00000000..a13e2c7a
--- /dev/null
+++ b/keymanager/pkg/
@@ -0,0 +1,13 @@
+# Generate wheels for dependencies
+# Use at your own risk.
+if [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+pip wheel --wheel-dir $WHEELHOUSE pip
+pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements.pip
+if [ -f pkg/requirements-testing.pip ]; then
+ pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip
diff --git a/keymanager/pkg/ b/keymanager/pkg/
new file mode 100755
index 00000000..8ca0956b
--- /dev/null
+++ b/keymanager/pkg/
@@ -0,0 +1,84 @@
+# Update pip and install LEAP base/testing requirements.
+# For convenience, $insecure_packages are allowed with insecure flags enabled.
+# Use at your own risk.
+# See $usage for help
+show_help() {
+ usage="Usage: $0 [--testing] [--use-leap-wheels]\n --testing\t\tInstall dependencies from requirements-testing.pip\n
+\t\t\tOtherwise, it will install requirements.pip\n
+--use-leap-wheels\tUse wheels from"
+ echo -e $usage
+ exit 1
+process_arguments() {
+ testing=false
+ while [ "$#" -gt 0 ]; do
+ # From
+ case "$1" in
+ --help) show_help;;
+ --testing) testing=true; shift 1;;
+ --use-leap-wheels) use_leap_wheels=true; shift 1;;
+ -h) show_help;;
+ -*) echo "unknown option: $1" >&2; exit 1;;
+ esac
+ done
+return_wheelhouse() {
+ if $use_leap_wheels ; then
+ WHEELHOUSE=$leap_wheelhouse
+ elif [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+ fi
+ # Tested with bash and zsh
+ if [[ $WHEELHOUSE != http* && ! -d "$WHEELHOUSE" ]]; then
+ fi
+ echo "$WHEELHOUSE"
+return_install_options() {
+ wheelhouse=`return_wheelhouse`
+ install_options="-U --find-links=$wheelhouse"
+ if $use_leap_wheels ; then
+ install_options="$install_options --trusted-host"
+ fi
+ echo $install_options
+return_insecure_flags() {
+ for insecure_package in $insecure_packages; do
+ flags="$flags --allow-external $insecure_package --allow-unverified $insecure_package"
+ done
+ echo $flags
+return_packages() {
+ if $testing ; then
+ packages="-r pkg/requirements-testing.pip"
+ else
+ packages="-r pkg/requirements.pip"
+ fi
+ echo $packages
+process_arguments $@
+pip install -U wheel
+pip install $install_options pip
+pip install $install_options $insecure_flags $packages
diff --git a/keymanager/pkg/requirements-latest.pip b/keymanager/pkg/requirements-latest.pip
new file mode 100644
index 00000000..148d42be
--- /dev/null
+++ b/keymanager/pkg/requirements-latest.pip
@@ -0,0 +1,8 @@
+--allow-external u1db --allow-unverified u1db
+--allow-external dirspec --allow-unverified dirspec
+-e 'git+'
+-e 'git+'
+-e 'git+'
+-e .
diff --git a/keymanager/pkg/requirements-leap.pip b/keymanager/pkg/requirements-leap.pip
new file mode 100644
index 00000000..4ba1d81d
--- /dev/null
+++ b/keymanager/pkg/requirements-leap.pip
@@ -0,0 +1 @@
diff --git a/keymanager/pkg/requirements-testing.pip b/keymanager/pkg/requirements-testing.pip
new file mode 100644
index 00000000..addda19b
--- /dev/null
+++ b/keymanager/pkg/requirements-testing.pip
@@ -0,0 +1,11 @@
+#Although it's not a package dependency, tests also depend on having
+#soledad client installed. Commenting to avoid versioning problem, you should
+#know what you are testing against :)
diff --git a/keymanager/pkg/requirements.pip b/keymanager/pkg/requirements.pip
new file mode 100644
index 00000000..386b85c6
--- /dev/null
+++ b/keymanager/pkg/requirements.pip
@@ -0,0 +1,3 @@
+# if we bump the gnupg version, bump also the sanity check
+# in keymanager.__init__
diff --git a/keymanager/pkg/tools/ b/keymanager/pkg/tools/
new file mode 100755
index 00000000..0169bb17
--- /dev/null
+++ b/keymanager/pkg/tools/
@@ -0,0 +1,2 @@
+git log --format='%aN <%aE>' | awk '{arr[$0]++} END{for (i in arr){print arr[i], i;}}' | sort -rn | cut -d' ' -f2-
diff --git a/keymanager/pkg/tools/ b/keymanager/pkg/tools/
new file mode 100755
index 00000000..26229785
--- /dev/null
+++ b/keymanager/pkg/tools/
@@ -0,0 +1,15 @@
+#Wraps a command in a virtualenwrapper passed as first argument.
+#Example: leap-bitmask ./
+source `which`
+echo "Activating virtualenv " $1
+echo "------------------------------------"
+workon $1
+cd $wd
+echo "running version: " `pyver leap.keymanager`
+echo "soledad version: " `pyver leap.soledad.common`
+$2 $3 $4 $5
diff --git a/keymanager/pkg/ b/keymanager/pkg/
new file mode 100644
index 00000000..9c9211be
--- /dev/null
+++ b/keymanager/pkg/
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Utils to help in the setup process
+import os
+import re
+import sys
+def is_develop_mode():
+ """
+ Returns True if we're calling the setup script using the argument for
+ setuptools development mode.
+ This avoids messing up with dependency pinning and order, the
+ responsibility of installing the leap dependencies is left to the
+ developer.
+ """
+ args = sys.argv
+ devflags = "", "develop"
+ if (args[0], args[1]) == devflags:
+ return True
+ return False
+def get_reqs_from_files(reqfiles):
+ """
+ Returns the contents of the top requirement file listed as a
+ string list with the lines.
+ @param reqfiles: requirement files to parse
+ @type reqfiles: list of str
+ """
+ for reqfile in reqfiles:
+ if os.path.isfile(reqfile):
+ return open(reqfile, 'r').read().split('\n')
+def parse_requirements(reqfiles=['requirements.txt',
+ 'requirements.pip',
+ 'pkg/requirements.pip']):
+ """
+ Parses the requirement files provided.
+ The passed reqfiles list is a list of possible locations to try, the
+ function will return the contents of the first path found.
+ Checks the value of LEAP_VENV_SKIP_PYSIDE to see if it should
+ return PySide as a dep or not. Don't set, or set to 0 if you want
+ to install it through pip.
+ @param reqfiles: requirement files to parse
+ @type reqfiles: list of str
+ """
+ requirements = []
+ skip_pyside = os.getenv("LEAP_VENV_SKIP_PYSIDE", "0") != "0"
+ for line in get_reqs_from_files(reqfiles):
+ # -e git://
+ if re.match(r'\s*-e\s+', line):
+ pass
+ # do not try to do anything with externals on vcs
+ # requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
+ # line))
+ #
+ elif re.match(r'\s*https?:', line):
+ requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
+ line))
+ # -f lines are for index locations, and don't get used here
+ elif re.match(r'\s*-f\s+', line):
+ pass
+ # argparse is part of the standard library starting with 2.7
+ # adding it to the requirements list screws distro installs
+ elif line == 'argparse' and sys.version_info >= (2, 7):
+ pass
+ elif line == 'PySide' and skip_pyside:
+ pass
+ # do not include comments
+ elif line.lstrip().startswith('#'):
+ pass
+ else:
+ if line != '':
+ requirements.append(line)
+ return requirements
diff --git a/keymanager/setup.cfg b/keymanager/setup.cfg
new file mode 100644
index 00000000..4a24b89b
--- /dev/null
+++ b/keymanager/setup.cfg
@@ -0,0 +1,17 @@
+test = trial
+exclude =,,*.egg,build,docs
+ignore = E731
+exclude =,,*.egg,build,docs
+ignore = E731
+VCS = git
+style = pep440
+versionfile_source = src/leap/keymanager/
+versionfile_build = leap/keymanager/
+tag_prefix =
diff --git a/keymanager/ b/keymanager/
new file mode 100644
index 00000000..cde5f8b6
--- /dev/null
+++ b/keymanager/
@@ -0,0 +1,152 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+setup file for leap.keymanager
+import re
+from setuptools import setup
+from setuptools import find_packages
+from setuptools import Command
+from pkg import utils
+import versioneer
+trove_classifiers = [
+ 'Development Status :: 4 - Beta',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Topic :: Communications :: Email',
+ 'Topic :: Internet',
+ 'Topic :: Security :: Cryptography',
+ 'Topic :: Software Development :: Libraries',
+ 'archive/%s.tar.gz')
+_versions = versioneer.get_versions()
+VERSION = _versions['version']
+VERSION_REVISION = _versions['full-revisionid']
+# get the short version for the download url
+_version_short = re.findall('\d+\.\d+\.\d+', VERSION)
+if len(_version_short) > 0:
+ VERSION_SHORT = _version_short[0]
+class freeze_debianver(Command):
+ """
+ Freezes the version in a debian branch.
+ To be used after merging the development branch onto the debian one.
+ """
+ user_options = []
+ template = r"""
+# This file was generated by the `freeze_debianver` command in
+# Using '' (0.16) from
+# revision-control system data, or from the parent directory name of an
+# unpacked source archive. Distribution tarballs contain a pre-generated copy
+# of this file.
+import json
+import sys
+version_json = '''
+ "dirty": false,
+ "error": null,
+ "full-revisionid": "FULL_REVISIONID",
+ "version": "VERSION_STRING"
+def get_versions():
+ return json.loads(version_json)
+ def initialize_options(self):
+ pass
+ def finalize_options(self):
+ pass
+ def run(self):
+ proceed = str(raw_input(
+ "This will overwrite the file Continue? [y/N] "))
+ if proceed != "y":
+ print("He. You scared. Aborting.")
+ return
+ subst_template = self.template.replace(
+ versioneer_cfg = versioneer.get_config_from_root('.')
+ with open(versioneer_cfg.versionfile_source, 'w') as f:
+ f.write(subst_template)
+cmdclass = versioneer.get_cmdclass()
+cmdclass["freeze_debianver"] = freeze_debianver
+# XXX add ref to docs
+requirements = utils.parse_requirements()
+if utils.is_develop_mode():
+ print
+ print ("[WARNING] Skipping leap-specific dependencies "
+ "because development mode is detected.")
+ print ("[WARNING] You can install "
+ "the latest published versions with "
+ "'pip install -r pkg/requirements-leap.pip'")
+ print ("[WARNING] Or you can instead do 'python develop' "
+ "from the parent folder of each one of them.")
+ print
+ requirements += utils.parse_requirements(
+ reqfiles=["pkg/requirements-leap.pip"])
+ name='leap.keymanager',
+ version=VERSION,
+ cmdclass=cmdclass,
+ url='',
+ download_url=DOWNLOAD_URL,
+ license='GPLv3+',
+ description='LEAP\'s Key Manager',
+ author='The LEAP Encryption Access Project',
+ author_email='',
+ maintainer='Kali Kaneko',
+ maintainer_email='',
+ long_description=(
+ "The Key Manager handles all types of keys to allow for "
+ "point-to-point encryption between parties communicating through "
+ "LEAP infrastructure."
+ ),
+ classifiers=trove_classifiers,
+ namespace_packages=["leap"],
+ packages=find_packages('src', exclude=['leap.keymanager.tests']),
+ package_dir={'': 'src'},
+ test_suite='leap.keymanager.tests',
+ install_requires=requirements,
+ tests_require=utils.parse_requirements(
+ reqfiles=['pkg/requirements-testing.pip']),
diff --git a/keymanager/src/leap/ b/keymanager/src/leap/
new file mode 100644
index 00000000..f48ad105
--- /dev/null
+++ b/keymanager/src/leap/
@@ -0,0 +1,6 @@
+# See
+ __import__('pkg_resources').declare_namespace(__name__)
+except ImportError:
+ from pkgutil import extend_path
+ __path__ = extend_path(__path__, __name__)
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..0b8a5b30
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,842 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Key Manager is a Nicknym agent for LEAP client.
+# let's do a little sanity check to see if we're using the wrong gnupg
+import fileinput
+import os
+import sys
+import tempfile
+import json
+import urllib
+from leap.common import ca_bundle
+from twisted.web import client
+from twisted.web._responses import NOT_FOUND
+from ._version import get_versions
+ from gnupg.gnupg import GPGUtilities
+ assert(GPGUtilities) # pyflakes happy
+ from gnupg import __version__ as _gnupg_version
+ if '-' in _gnupg_version:
+ # avoid Parsing it as LegacyVersion, get just
+ # the release numbers:
+ _gnupg_version = _gnupg_version.split('-')[0]
+ from pkg_resources import parse_version
+ # We need to make sure that we're not colliding with the infamous
+ # python-gnupg
+ assert(parse_version(_gnupg_version) >= parse_version('1.4.0'))
+except (ImportError, AssertionError):
+ print "*******"
+ print "Ooops! It looks like there is a conflict in the installed version "
+ print "of gnupg."
+ print "GNUPG_VERSION:", _gnupg_version
+ print
+ print "Disclaimer: Ideally, we would need to work a patch and propose the "
+ print "merge to upstream. But until then do: "
+ print
+ print "% pip uninstall python-gnupg"
+ print "% pip install gnupg"
+ print "*******"
+ sys.exit(1)
+import logging
+from twisted.internet import defer
+from urlparse import urlparse
+from leap.common.check import leap_assert
+from leap.common.http import HTTPClient
+from import emit_async, catalog
+from leap.common.decorators import memoized_method
+from leap.keymanager.errors import (
+ KeyNotFound,
+ KeyNotValidUpgrade,
+ InvalidSignature
+from leap.keymanager.validation import ValidationLevels, can_upgrade
+from leap.keymanager.openpgp import OpenPGPScheme
+__version__ = get_versions()['version']
+del get_versions
+logger = logging.getLogger(__name__)
+# The Key Manager
+class KeyManager(object):
+ #
+ # server's key storage constants
+ #
+ OPENPGP_KEY = 'openpgp'
+ PUBKEY_KEY = "user[public_key]"
+ def __init__(self, address, nickserver_uri, soledad, token=None,
+ ca_cert_path=None, api_uri=None, api_version=None, uid=None,
+ gpgbinary=None):
+ """
+ Initialize a Key Manager for user's C{address} with provider's
+ nickserver reachable in C{nickserver_uri}.
+ :param address: The email address of the user of this Key Manager.
+ :type address: str
+ :param nickserver_uri: The URI of the nickserver.
+ :type nickserver_uri: str
+ :param soledad: A Soledad instance for local storage of keys.
+ :type soledad: leap.soledad.Soledad
+ :param token: The token for interacting with the webapp API.
+ :type token: str
+ :param ca_cert_path: The path to the CA certificate.
+ :type ca_cert_path: str
+ :param api_uri: The URI of the webapp API.
+ :type api_uri: str
+ :param api_version: The version of the webapp API.
+ :type api_version: str
+ :param uid: The user's UID.
+ :type uid: str
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+ """
+ self._address = address
+ self._nickserver_uri = nickserver_uri
+ self._soledad = soledad
+ self._token = token
+ self.ca_cert_path = ca_cert_path
+ self.api_uri = api_uri
+ self.api_version = api_version
+ self.uid = uid
+ self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary)
+ self._combined_ca_bundle = self._create_combined_bundle_file()
+ self._async_client = HTTPClient(self._combined_ca_bundle)
+ self._async_client_pinned = HTTPClient(self._ca_cert_path)
+ #
+ # destructor
+ #
+ def __del__(self):
+ try:
+ created_tmp_combined_ca_bundle = self._combined_ca_bundle not in \
+ [ca_bundle.where(), self._ca_cert_path]
+ if created_tmp_combined_ca_bundle:
+ os.remove(self._combined_ca_bundle)
+ except OSError:
+ pass
+ #
+ # utilities
+ #
+ def _create_combined_bundle_file(self):
+ leap_ca_bundle = ca_bundle.where()
+ if self._ca_cert_path == leap_ca_bundle:
+ return self._ca_cert_path # don't merge file with itself
+ elif not self._ca_cert_path:
+ return leap_ca_bundle
+ tmp_file = tempfile.NamedTemporaryFile(delete=False)
+ with open(, 'w') as fout:
+ fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path))
+ for line in fin:
+ fout.write(line)
+ fin.close()
+ return
+ @defer.inlineCallbacks
+ def _get_key_from_nicknym(self, address):
+ """
+ Send a GET request to C{uri} containing C{data}.
+ :param address: The URI of the request.
+ :type address: str
+ :return: A deferred that will be fired with GET content as json (dict)
+ :rtype: Deferred
+ """
+ try:
+ uri = self._nickserver_uri + '?address=' + address
+ content = yield self._fetch_and_handle_404_from_nicknym(uri, address)
+ json_content = json.loads(content)
+ except KeyNotFound:
+ raise
+ except IOError as e:
+ logger.warning("HTTP error retrieving key: %r" % (e,))
+ logger.warning("%s" % (content,))
+ raise KeyNotFound(e.message), None, sys.exc_info()[2]
+ except ValueError as v:
+ logger.warning("Invalid JSON data from key: %s" % (uri,))
+ raise KeyNotFound(v.message + ' - ' + uri), None, sys.exc_info()[2]
+ except Exception as e:
+ logger.warning("Error retrieving key: %r" % (e,))
+ raise KeyNotFound(e.message), None, sys.exc_info()[2]
+ # Responses are now text/plain, although it's json anyway, but
+ # this will fail when it shouldn't
+ # leap_assert(
+ # res.headers['content-type'].startswith('application/json'),
+ # 'Content-type is not JSON.')
+ defer.returnValue(json_content)
+ def _fetch_and_handle_404_from_nicknym(self, uri, address):
+ """
+ Send a GET request to C{uri} containing C{data}.
+ :param uri: The URI of the request.
+ :type uri: str
+ :param address: The email corresponding to the key.
+ :type address: str
+ :return: A deferred that will be fired with GET content as json (dict)
+ :rtype: Deferred
+ """
+ def check_404(response):
+ if response.code == NOT_FOUND:
+ message = '%s: %s key not found.' % (response.code, address)
+ logger.warning(message)
+ raise KeyNotFound(message), None, sys.exc_info()[2]
+ return response
+ d = self._async_client_pinned.request(str(uri), 'GET', callback=check_404)
+ d.addCallback(client.readBody)
+ return d
+ @defer.inlineCallbacks
+ def _get_with_combined_ca_bundle(self, uri, data=None):
+ """
+ Send a GET request to C{uri} containing C{data}.
+ Instead of using the ca_cert provided on construction time, this
+ version also uses the default certificates shipped with leap.common
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
+ :return: A deferred that will be fired with the GET response
+ :rtype: Deferred
+ """
+ try:
+ content = yield self._async_client.request(str(uri), 'GET')
+ except Exception as e:
+ logger.warning("There was a problem fetching key: %s" % (e,))
+ raise KeyNotFound(uri)
+ if not content:
+ raise KeyNotFound(uri)
+ defer.returnValue(content)
+ @defer.inlineCallbacks
+ def _put(self, uri, data=None):
+ """
+ Send a PUT request to C{uri} containing C{data}.
+ The request will be sent using the configured CA certificate path to
+ verify the server certificate and the configured session id for
+ authentication.
+ :param uri: The URI of the request.
+ :type uri: str
+ :param data: The body of the request.
+ :type data: dict, str or file
+ :return: A deferred that will be fired when PUT request finishes
+ :rtype: Deferred
+ """
+ leap_assert(
+ self._token is not None,
+ 'We need a token to interact with webapp!')
+ if type(data) == dict:
+ data = urllib.urlencode(data)
+ headers = {'Authorization': [str('Token token=%s' % self._token)]}
+ headers['Content-Type'] = ['application/x-www-form-urlencoded']
+ try:
+ res = yield self._async_client_pinned.request(str(uri), 'PUT',
+ body=str(data),
+ headers=headers)
+ except Exception as e:
+ logger.warning("Error uploading key: %r" % (e,))
+ raise e
+ if 'error' in res:
+ # FIXME: That's a workaround for 500,
+ # we need to implement a readBody to assert response code
+ logger.warning("Error uploading key: %r" % (res,))
+ raise Exception(res)
+ @memoized_method(invalidation=300)
+ @defer.inlineCallbacks
+ def _fetch_keys_from_server(self, address):
+ """
+ Fetch keys bound to address from nickserver and insert them in
+ local database.
+ :param address: The address bound to the keys.
+ :type address: str
+ :return: A Deferred which fires when the key is in the storage,
+ or which fails with KeyNotFound if the key was not found on
+ nickserver.
+ :rtype: Deferred
+ """
+ # request keys from the nickserver
+ server_keys = yield self._get_key_from_nicknym(address)
+ # insert keys in local database
+ if self.OPENPGP_KEY in server_keys:
+ # nicknym server is authoritative for its own domain,
+ # for other domains the key might come from key servers.
+ validation_level = ValidationLevels.Weak_Chain
+ _, domain = _split_email(address)
+ if (domain == _get_domain(self._nickserver_uri)):
+ validation_level = ValidationLevels.Provider_Trust
+ yield self.put_raw_key(
+ server_keys['openpgp'],
+ address=address,
+ validation=validation_level)
+ #
+ # key management
+ #
+ def send_key(self):
+ """
+ Send user's key to provider.
+ Public key bound to user's is sent to provider, which will sign it and
+ replace any prior keys for the same address in its database.
+ :return: A Deferred which fires when the key is sent, or which fails
+ with KeyNotFound if the key was not found in local database.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ def send(pubkey):
+ data = {
+ self.PUBKEY_KEY: pubkey.key_data
+ }
+ uri = "%s/%s/users/%s.json" % (
+ self._api_uri,
+ self._api_version,
+ self._uid)
+ d = self._put(uri, data)
+ d.addCallback(lambda _:
+ self._address))
+ return d
+ d = self.get_key(
+ self._address, private=False, fetch_remote=False)
+ d.addCallback(send)
+ return d
+ def get_key(self, address, private=False, fetch_remote=True):
+ """
+ Return a key bound to address.
+ First, search for the key in local storage. If it is not available,
+ then try to fetch from nickserver.
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
+ :param fetch_remote: If key not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
+ :return: A Deferred which fires with an EncryptionKey bound to address,
+ or which fails with KeyNotFound if no key was found neither
+ locally or in keyserver or fail with KeyVersionError if the
+ key has a format not supported by this version of KeyManager
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ logger.debug("getting key for %s" % (address,))
+ emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ def key_found(key):
+ emit_async(catalog.KEYMANAGER_KEY_FOUND, address)
+ return key
+ def key_not_found(failure):
+ if not failure.check(KeyNotFound):
+ return failure
+ emit_async(catalog.KEYMANAGER_KEY_NOT_FOUND, address)
+ # we will only try to fetch a key from nickserver if fetch_remote
+ # is True and the key is not private.
+ if fetch_remote is False or private is True:
+ return failure
+ emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address)
+ d = self._fetch_keys_from_server(address)
+ d.addCallback(
+ lambda _: self._openpgp.get_key(address, private=False))
+ d.addCallback(key_found)
+ return d
+ # return key if it exists in local database
+ d = self._openpgp.get_key(address, private=private)
+ d.addCallbacks(key_found, key_not_found)
+ return d
+ def get_all_keys(self, private=False):
+ """
+ Return all keys stored in local database.
+ :param private: Include private keys
+ :type private: bool
+ :return: A Deferred which fires with a list of all keys in local db.
+ :rtype: Deferred
+ """
+ return self._openpgp.get_all_keys(private)
+ def gen_key(self):
+ """
+ Generate a key bound to the user's address.
+ :return: A Deferred which fires with the generated EncryptionKey.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ def signal_finished(key):
+ emit_async(
+ return key
+ emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address)
+ d = self._openpgp.gen_key(self._address)
+ d.addCallback(signal_finished)
+ return d
+ #
+ # Setters/getters
+ #
+ def _get_token(self):
+ return self._token
+ def _set_token(self, token):
+ self._token = token
+ token = property(
+ _get_token, _set_token, doc='The session token.')
+ def _get_ca_cert_path(self):
+ return self._ca_cert_path
+ def _set_ca_cert_path(self, ca_cert_path):
+ self._ca_cert_path = ca_cert_path
+ ca_cert_path = property(
+ _get_ca_cert_path, _set_ca_cert_path,
+ doc='The path to the CA certificate.')
+ def _get_api_uri(self):
+ return self._api_uri
+ def _set_api_uri(self, api_uri):
+ self._api_uri = api_uri
+ api_uri = property(
+ _get_api_uri, _set_api_uri, doc='The webapp API URI.')
+ def _get_api_version(self):
+ return self._api_version
+ def _set_api_version(self, api_version):
+ self._api_version = api_version
+ api_version = property(
+ _get_api_version, _set_api_version, doc='The webapp API version.')
+ def _get_uid(self):
+ return self._uid
+ def _set_uid(self, uid):
+ self._uid = uid
+ uid = property(
+ _get_uid, _set_uid, doc='The uid of the user.')
+ #
+ # encrypt/decrypt and sign/verify API
+ #
+ def encrypt(self, data, address, passphrase=None, sign=None,
+ cipher_algo='AES256', fetch_remote=True):
+ """
+ Encrypt data with the public key bound to address and sign with with
+ the private key bound to sign address.
+ :param data: The data to be encrypted.
+ :type data: str
+ :param address: The address to encrypt it for.
+ :type address: str
+ :param passphrase: The passphrase for the secret key used for the
+ signature.
+ :type passphrase: str
+ :param sign: The address to be used for signature.
+ :type sign: str
+ :param cipher_algo: The cipher algorithm to use.
+ :type cipher_algo: str
+ :param fetch_remote: If key is not found in local storage try to fetch
+ from nickserver
+ :type fetch_remote: bool
+ :return: A Deferred which fires with the encrypted data as str, or
+ which fails with KeyNotFound if no keys were found neither
+ locally or in keyserver or fails with KeyVersionError if the
+ key format is not supported or fails with EncryptError if
+ failed encrypting for some reason.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ @defer.inlineCallbacks
+ def encrypt(keys):
+ pubkey, signkey = keys
+ encrypted = yield self._openpgp.encrypt(
+ data, pubkey, passphrase, sign=signkey,
+ cipher_algo=cipher_algo)
+ if not pubkey.encr_used:
+ pubkey.encr_used = True
+ yield self._openpgp.put_key(pubkey)
+ defer.returnValue(encrypted)
+ dpub = self.get_key(address, private=False,
+ fetch_remote=fetch_remote)
+ dpriv = defer.succeed(None)
+ if sign is not None:
+ dpriv = self.get_key(sign, private=True)
+ d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
+ d.addCallbacks(encrypt, self._extract_first_error)
+ return d
+ def decrypt(self, data, address, passphrase=None, verify=None,
+ fetch_remote=True):
+ """
+ Decrypt data using private key from address and verify with public key
+ bound to verify address.
+ :param data: The data to be decrypted.
+ :type data: str
+ :param address: The address to whom data was encrypted.
+ :type address: str
+ :param passphrase: The passphrase for the secret key used for
+ decryption.
+ :type passphrase: str
+ :param verify: The address to be used for signature.
+ :type verify: str
+ :param fetch_remote: If key for verify not found in local storage try
+ to fetch from nickserver
+ :type fetch_remote: bool
+ :return: A Deferred which fires with:
+ * (decripted str, signing key) if validation works
+ * (decripted str, KeyNotFound) if signing key not found
+ * (decripted str, InvalidSignature) if signature is invalid
+ * KeyNotFound failure if private key not found
+ * DecryptError failure if decription failed
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ @defer.inlineCallbacks
+ def decrypt(keys):
+ pubkey, privkey = keys
+ decrypted, signed = yield self._openpgp.decrypt(
+ data, privkey, passphrase=passphrase, verify=pubkey)
+ if pubkey is None:
+ signature = KeyNotFound(verify)
+ elif signed:
+ signature = pubkey
+ if not pubkey.sign_used:
+ pubkey.sign_used = True
+ yield self._openpgp.put_key(pubkey)
+ defer.returnValue((decrypted, signature))
+ else:
+ signature = InvalidSignature(
+ 'Failed to verify signature with key %s' %
+ (pubkey.fingerprint,))
+ defer.returnValue((decrypted, signature))
+ dpriv = self.get_key(address, private=True)
+ dpub = defer.succeed(None)
+ if verify is not None:
+ dpub = self.get_key(verify, private=False,
+ fetch_remote=fetch_remote)
+ dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f)
+ d = defer.gatherResults([dpub, dpriv], consumeErrors=True)
+ d.addCallbacks(decrypt, self._extract_first_error)
+ return d
+ def _extract_first_error(self, failure):
+ return failure.value.subFailure
+ def sign(self, data, address, digest_algo='SHA512', clearsign=False,
+ detach=True, binary=False):
+ """
+ Sign data with private key bound to address.
+ :param data: The data to be signed.
+ :type data: str
+ :param address: The address to be used to sign.
+ :type address: EncryptionKey
+ :param digest_algo: The hash digest to use.
+ :type digest_algo: str
+ :param clearsign: If True, create a cleartext signature.
+ :type clearsign: bool
+ :param detach: If True, create a detached signature.
+ :type detach: bool
+ :param binary: If True, do not ascii armour the output.
+ :type binary: bool
+ :return: A Deferred which fires with the signed data as str or fails
+ with KeyNotFound if no key was found neither locally or in
+ keyserver or fails with SignFailed if there was any error
+ signing.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ def sign(privkey):
+ return self._openpgp.sign(
+ data, privkey, digest_algo=digest_algo, clearsign=clearsign,
+ detach=detach, binary=binary)
+ d = self.get_key(address, private=True)
+ d.addCallback(sign)
+ return d
+ def verify(self, data, address, detached_sig=None,
+ fetch_remote=True):
+ """
+ Verify signed data with private key bound to address, eventually using
+ detached_sig.
+ :param data: The data to be verified.
+ :type data: str
+ :param address: The address to be used to verify.
+ :type address: EncryptionKey
+ :param detached_sig: A detached signature. If given, C{data} is
+ verified using this detached signature.
+ :type detached_sig: str
+ :param fetch_remote: If key for verify not found in local storage try
+ to fetch from nickserver
+ :type fetch_remote: bool
+ :return: A Deferred which fires with the signing EncryptionKey if
+ signature verifies, or which fails with InvalidSignature if
+ signature don't verifies or fails with KeyNotFound if no key
+ was found neither locally or in keyserver.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ def verify(pubkey):
+ signed = self._openpgp.verify(
+ data, pubkey, detached_sig=detached_sig)
+ if signed:
+ if not pubkey.sign_used:
+ pubkey.sign_used = True
+ d = self._openpgp.put_key(pubkey)
+ d.addCallback(lambda _: pubkey)
+ return d
+ return pubkey
+ else:
+ raise InvalidSignature(
+ 'Failed to verify signature with key %s' %
+ (pubkey.fingerprint,))
+ d = self.get_key(address, private=False,
+ fetch_remote=fetch_remote)
+ d.addCallback(verify)
+ return d
+ def delete_key(self, key):
+ """
+ Remove key from storage.
+ :param key: The key to be removed.
+ :type key: EncryptionKey
+ :return: A Deferred which fires when the key is deleted, or which fails
+ KeyNotFound if the key was not found on local storage.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ return self._openpgp.delete_key(key)
+ def put_key(self, key):
+ """
+ Put key bound to address in local storage.
+ :param key: The key to be stored
+ :type key: EncryptionKey
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyNotValidUpdate if a key with the same
+ uid exists and the new one is not a valid update for it.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ def old_key_not_found(failure):
+ if failure.check(KeyNotFound):
+ return None
+ else:
+ return failure
+ def check_upgrade(old_key):
+ if key.private or can_upgrade(key, old_key):
+ return self._openpgp.put_key(key)
+ else:
+ raise KeyNotValidUpgrade(
+ "Key %s can not be upgraded by new key %s"
+ % (old_key.fingerprint, key.fingerprint))
+ d = self._openpgp.get_key(key.address, private=key.private)
+ d.addErrback(old_key_not_found)
+ d.addCallback(check_upgrade)
+ return d
+ def put_raw_key(self, key, address,
+ validation=ValidationLevels.Weak_Chain):
+ """
+ Put raw key bound to address in local storage.
+ :param key: The ascii key to be stored
+ :type key: str
+ :param address: address for which this key will be active
+ :type address: str
+ :param validation: validation level for this key
+ (default: 'Weak_Chain')
+ :type validation: ValidationLevels
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyAddressMismatch if address doesn't match
+ any uid on the key or fails with KeyNotFound if no OpenPGP
+ material was found in key or fails with KeyNotValidUpdate if a
+ key with the same uid exists and the new one is not a valid
+ update for it.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+ pubkey, privkey = self._openpgp.parse_key(key, address)
+ if pubkey is None:
+ return
+ pubkey.validation = validation
+ d = self.put_key(pubkey)
+ if privkey is not None:
+ d.addCallback(lambda _: self.put_key(privkey))
+ return d
+ @defer.inlineCallbacks
+ def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain):
+ """
+ Fetch a public key bound to address from the network and put it in
+ local storage.
+ :param address: The email address of the key.
+ :type address: str
+ :param uri: The URI of the key.
+ :type uri: str
+ :param validation: validation level for this key
+ (default: 'Weak_Chain')
+ :type validation: ValidationLevels
+ :return: A Deferred which fires when the key is in the storage, or
+ which fails with KeyNotFound: if not valid key on uri or fails
+ with KeyAddressMismatch if address doesn't match any uid on
+ the key or fails with KeyNotValidUpdate if a key with the same
+ uid exists and the new one is not a valid update for it.
+ :rtype: Deferred
+ :raise UnsupportedKeyTypeError: if invalid key type
+ """
+"Fetch key for %s from %s" % (address, uri))
+ ascii_content = yield self._get_with_combined_ca_bundle(uri)
+ # XXX parse binary keys
+ pubkey, _ = self._openpgp.parse_key(ascii_content, address)
+ if pubkey is None:
+ raise KeyNotFound(uri)
+ pubkey.validation = validation
+ yield self.put_key(pubkey)
+def _split_email(address):
+ """
+ Split username and domain from an email address
+ :param address: an email address
+ :type address: str
+ :return: username and domain from the email address
+ :rtype: (str, str)
+ """
+ if address.count("@") != 1:
+ return None
+ return address.split("@")
+def _get_domain(url):
+ """
+ Get the domain from an url
+ :param url: an url
+ :type url: str
+ :return: the domain part of the url
+ :rtype: str
+ """
+ return urlparse(url).hostname
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..b28c6977
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,484 @@
+# This file helps to compute a version number in source trees obtained from
+# git-archive tarball (such as those provided by githubs download-from-tag
+# feature). Distribution tarballs (built by sdist) and build
+# directories (produced by build) will contain a much shorter file
+# that just contains the computed version number.
+# This file is released into the public domain. Generated by
+# versioneer-0.16 (
+"""Git implementation of"""
+import errno
+import os
+import re
+import subprocess
+import sys
+def get_keywords():
+ """Get the keywords needed to look up the version information."""
+ # these strings will be replaced by git during git-archive.
+ # will grep for the variable names, so they must
+ # each be defined on a line of their own. will just call
+ # get_keywords().
+ git_refnames = "$Format:%d$"
+ git_full = "$Format:%H$"
+ keywords = {"refnames": git_refnames, "full": git_full}
+ return keywords
+class VersioneerConfig:
+ """Container for Versioneer configuration parameters."""
+def get_config():
+ """Create, populate and return the VersioneerConfig() object."""
+ # these strings are filled in when ' versioneer' creates
+ #
+ cfg = VersioneerConfig()
+ cfg.VCS = "git"
+ = "pep440"
+ cfg.tag_prefix = ""
+ cfg.parentdir_prefix = "None"
+ cfg.versionfile_source = "src/leap/keymanager/"
+ cfg.verbose = False
+ return cfg
+class NotThisMethod(Exception):
+ """Exception raised if a method is not valid for the current scenario."""
+def register_vcs_handler(vcs, method): # decorator
+ """Decorator to mark a method as the handler for a particular VCS."""
+ def decorate(f):
+ """Store f in HANDLERS[vcs][method]."""
+ if vcs not in HANDLERS:
+ HANDLERS[vcs] = {}
+ HANDLERS[vcs][method] = f
+ return f
+ return decorate
+def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False):
+ """Call the given command(s)."""
+ assert isinstance(commands, list)
+ p = None
+ for c in commands:
+ try:
+ dispcmd = str([c] + args)
+ # remember shell=False, so use git.cmd on windows, not just git
+ p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE,
+ stderr=(subprocess.PIPE if hide_stderr
+ else None))
+ break
+ except EnvironmentError:
+ e = sys.exc_info()[1]
+ if e.errno == errno.ENOENT:
+ continue
+ if verbose:
+ print("unable to run %s" % dispcmd)
+ print(e)
+ return None
+ else:
+ if verbose:
+ print("unable to find command, tried %s" % (commands,))
+ return None
+ stdout = p.communicate()[0].strip()
+ if sys.version_info[0] >= 3:
+ stdout = stdout.decode()
+ if p.returncode != 0:
+ if verbose:
+ print("unable to run %s (error)" % dispcmd)
+ return None
+ return stdout
+def versions_from_parentdir(parentdir_prefix, root, verbose):
+ """Try to determine the version from the parent directory name.
+ Source tarballs conventionally unpack into a directory that includes
+ both the project name and a version string.
+ """
+ dirname = os.path.basename(root)
+ if not dirname.startswith(parentdir_prefix):
+ if verbose:
+ print("guessing rootdir is '%s', but '%s' doesn't start with "
+ "prefix '%s'" % (root, dirname, parentdir_prefix))
+ raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
+ return {"version": dirname[len(parentdir_prefix):],
+ "full-revisionid": None,
+ "dirty": False, "error": None}
+@register_vcs_handler("git", "get_keywords")
+def git_get_keywords(versionfile_abs):
+ """Extract version information from the given file."""
+ # the code embedded in can just fetch the value of these
+ # keywords. When used from, we don't want to import,
+ # so we do it with a regexp instead. This function is not used from
+ #
+ keywords = {}
+ try:
+ f = open(versionfile_abs, "r")
+ for line in f.readlines():
+ if line.strip().startswith("git_refnames ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["refnames"] =
+ if line.strip().startswith("git_full ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["full"] =
+ f.close()
+ except EnvironmentError:
+ pass
+ return keywords
+@register_vcs_handler("git", "keywords")
+def git_versions_from_keywords(keywords, tag_prefix, verbose):
+ """Get version information from git keywords."""
+ if not keywords:
+ raise NotThisMethod("no keywords at all, weird")
+ refnames = keywords["refnames"].strip()
+ if refnames.startswith("$Format"):
+ if verbose:
+ print("keywords are unexpanded, not using")
+ raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
+ refs = set([r.strip() for r in refnames.strip("()").split(",")])
+ # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
+ # just "foo-1.0". If we see a "tag: " prefix, prefer those.
+ TAG = "tag: "
+ tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
+ if not tags:
+ # Either we're using git < 1.8.3, or there really are no tags. We use
+ # a heuristic: assume all version tags have a digit. The old git %d
+ # expansion behaves like git log --decorate=short and strips out the
+ # refs/heads/ and refs/tags/ prefixes that would let us distinguish
+ # between branches and tags. By ignoring refnames without digits, we
+ # filter out many common branch names like "release" and
+ # "stabilization", as well as "HEAD" and "master".
+ tags = set([r for r in refs if'\d', r)])
+ if verbose:
+ print("discarding '%s', no digits" % ",".join(refs-tags))
+ if verbose:
+ print("likely tags: %s" % ",".join(sorted(tags)))
+ for ref in sorted(tags):
+ # sorting will prefer e.g. "2.0" over "2.0rc1"
+ if ref.startswith(tag_prefix):
+ r = ref[len(tag_prefix):]
+ if verbose:
+ print("picking %s" % r)
+ return {"version": r,
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": None
+ }
+ # no suitable tags, so version is "0+unknown", but full hex is still there
+ if verbose:
+ print("no suitable tags, using unknown + full revision id")
+ return {"version": "0+unknown",
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": "no suitable tags"}
+@register_vcs_handler("git", "pieces_from_vcs")
+def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
+ """Get version from 'git describe' in the root of the source tree.
+ This only gets called if the git-archive 'subst' keywords were *not*
+ expanded, and hasn't already been rewritten with a short
+ version string, meaning we're inside a checked out source tree.
+ """
+ if not os.path.exists(os.path.join(root, ".git")):
+ if verbose:
+ print("no .git in %s" % root)
+ raise NotThisMethod("no .git directory")
+ GITS = ["git"]
+ if sys.platform == "win32":
+ GITS = ["git.cmd", "git.exe"]
+ # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
+ # if there isn't one, this yields HEX[-dirty] (no NUM)
+ describe_out = run_command(GITS, ["describe", "--tags", "--dirty",
+ "--always", "--long",
+ "--match", "%s*" % tag_prefix],
+ cwd=root)
+ # --long was added in git-1.5.5
+ if describe_out is None:
+ raise NotThisMethod("'git describe' failed")
+ describe_out = describe_out.strip()
+ full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
+ if full_out is None:
+ raise NotThisMethod("'git rev-parse' failed")
+ full_out = full_out.strip()
+ pieces = {}
+ pieces["long"] = full_out
+ pieces["short"] = full_out[:7] # maybe improved later
+ pieces["error"] = None
+ # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
+ # TAG might have hyphens.
+ git_describe = describe_out
+ # look for -dirty suffix
+ dirty = git_describe.endswith("-dirty")
+ pieces["dirty"] = dirty
+ if dirty:
+ git_describe = git_describe[:git_describe.rindex("-dirty")]
+ # now we have TAG-NUM-gHEX or HEX
+ if "-" in git_describe:
+ mo ='^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
+ if not mo:
+ # unparseable. Maybe git-describe is misbehaving?
+ pieces["error"] = ("unable to parse git-describe output: '%s'"
+ % describe_out)
+ return pieces
+ # tag
+ full_tag =
+ if not full_tag.startswith(tag_prefix):
+ if verbose:
+ fmt = "tag '%s' doesn't start with prefix '%s'"
+ print(fmt % (full_tag, tag_prefix))
+ pieces["error"] = ("tag '%s' doesn't start with prefix '%s'"
+ % (full_tag, tag_prefix))
+ return pieces
+ pieces["closest-tag"] = full_tag[len(tag_prefix):]
+ # distance: number of commits since tag
+ pieces["distance"] = int(
+ # commit: short hex revision ID
+ pieces["short"] =
+ else:
+ # HEX: no tags
+ pieces["closest-tag"] = None
+ count_out = run_command(GITS, ["rev-list", "HEAD", "--count"],
+ cwd=root)
+ pieces["distance"] = int(count_out) # total number of commits
+ return pieces
+def plus_or_dot(pieces):
+ """Return a + if we don't already have one, else return a ."""
+ if "+" in pieces.get("closest-tag", ""):
+ return "."
+ return "+"
+def render_pep440(pieces):
+ """Build up version string, with post-release "local version identifier".
+ Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
+ get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty
+ Exceptions:
+ 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += plus_or_dot(pieces)
+ rendered += "%d.g%s" % (pieces["distance"], pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ else:
+ # exception #1
+ rendered = "0+untagged.%d.g%s" % (pieces["distance"],
+ pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ return rendered
+def render_pep440_pre(pieces):
+ """TAG[.post.devDISTANCE] -- No -dirty.
+ Exceptions:
+ 1: no tags.
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "" % pieces["distance"]
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ return rendered
+def render_pep440_post(pieces):
+ """TAG[.postDISTANCE[.dev0]+gHEX] .
+ The ".dev0" means dirty. Note that .dev0 sorts backwards
+ (a dirty tree will appear "older" than the corresponding clean one),
+ but you shouldn't be releasing software with -dirty anyways.
+ Exceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%d" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += plus_or_dot(pieces)
+ rendered += "g%s" % pieces["short"]
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += "+g%s" % pieces["short"]
+ return rendered
+def render_pep440_old(pieces):
+ """TAG[.postDISTANCE[.dev0]] .
+ The ".dev0" means dirty.
+ Eexceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%d" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ return rendered
+def render_git_describe(pieces):
+ """TAG[-DISTANCE-gHEX][-dirty].
+ Like 'git describe --tags --dirty --always'.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render_git_describe_long(pieces):
+ """TAG-DISTANCE-gHEX[-dirty].
+ Like 'git describe --tags --dirty --always -long'.
+ The distance/hash is unconditional.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render(pieces, style):
+ """Render the given version pieces into the requested style."""
+ if pieces["error"]:
+ return {"version": "unknown",
+ "full-revisionid": pieces.get("long"),
+ "dirty": None,
+ "error": pieces["error"]}
+ if not style or style == "default":
+ style = "pep440" # the default
+ if style == "pep440":
+ rendered = render_pep440(pieces)
+ elif style == "pep440-pre":
+ rendered = render_pep440_pre(pieces)
+ elif style == "pep440-post":
+ rendered = render_pep440_post(pieces)
+ elif style == "pep440-old":
+ rendered = render_pep440_old(pieces)
+ elif style == "git-describe":
+ rendered = render_git_describe(pieces)
+ elif style == "git-describe-long":
+ rendered = render_git_describe_long(pieces)
+ else:
+ raise ValueError("unknown style '%s'" % style)
+ return {"version": rendered, "full-revisionid": pieces["long"],
+ "dirty": pieces["dirty"], "error": None}
+def get_versions():
+ """Get version information or return default if unable to do so."""
+ # I am in, which lives at ROOT/VERSIONFILE_SOURCE. If we have
+ # __file__, we can work backwards from there to the root. Some
+ # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which
+ # case we can only use expanded keywords.
+ cfg = get_config()
+ verbose = cfg.verbose
+ try:
+ return git_versions_from_keywords(get_keywords(), cfg.tag_prefix,
+ verbose)
+ except NotThisMethod:
+ pass
+ try:
+ root = os.path.realpath(__file__)
+ # versionfile_source is the relative path from the top of the source
+ # tree (where the .git directory might live) to this file. Invert
+ # this to find the root from __file__.
+ for i in cfg.versionfile_source.split('/'):
+ root = os.path.dirname(root)
+ except NameError:
+ return {"version": "0+unknown", "full-revisionid": None,
+ "dirty": None,
+ "error": "unable to find root of source tree"}
+ try:
+ pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose)
+ return render(pieces,
+ except NotThisMethod:
+ pass
+ try:
+ if cfg.parentdir_prefix:
+ return versions_from_parentdir(cfg.parentdir_prefix, root, verbose)
+ except NotThisMethod:
+ pass
+ return {"version": "0+unknown", "full-revisionid": None,
+ "dirty": None,
+ "error": "unable to compute version"}
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..2ed53769
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013-2016 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Soledad documents
+from twisted.internet import defer
+from leap.common.check import leap_assert
+# Dictionary keys used for storing cryptographic keys.
+KEY_VERSION_KEY = 'version'
+KEY_UIDS_KEY = 'uids'
+KEY_ADDRESS_KEY = 'address'
+KEY_TYPE_KEY = 'type'
+KEY_FINGERPRINT_KEY = 'fingerprint'
+KEY_DATA_KEY = 'key_data'
+KEY_PRIVATE_KEY = 'private'
+KEY_LENGTH_KEY = 'length'
+KEY_EXPIRY_DATE_KEY = 'expiry_date'
+KEY_LAST_AUDITED_AT_KEY = 'last_audited_at'
+KEY_REFRESHED_AT_KEY = 'refreshed_at'
+KEY_VALIDATION_KEY = 'validation'
+KEY_ENCR_USED_KEY = 'encr_used'
+KEY_SIGN_USED_KEY = 'sign_used'
+KEY_TAGS_KEY = 'tags'
+# Key storage constants
+KEYMANAGER_KEY_TAG = 'keymanager-key'
+KEYMANAGER_ACTIVE_TAG = 'keymanager-active'
+# Version of the Soledad Document schema,
+# it should be bumped each time the document format changes
+# key indexing constants.
+TAGS_PRIVATE_INDEX = 'by-tags-private'
+TYPE_FINGERPRINT_PRIVATE_INDEX = 'by-type-fingerprint-private'
+TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private'
+ 'bool(%s)' % KEY_PRIVATE_KEY,
+ ],
+ 'bool(%s)' % KEY_PRIVATE_KEY,
+ ],
+ 'bool(%s)' % KEY_PRIVATE_KEY,
+ ]
+def init_indexes(soledad):
+ """
+ Initialize the database indexes.
+ """
+ leap_assert(soledad is not None,
+ "Cannot init indexes with null soledad")
+ indexes = yield soledad.list_indexes()
+ db_indexes = dict(indexes)
+ # Loop through the indexes we expect to find.
+ for name, expression in INDEXES.items():
+ if name not in db_indexes:
+ # The index does not yet exist.
+ yield soledad.create_index(name, *expression)
+ elif expression != db_indexes[name]:
+ # The index exists but the definition is not what expected,
+ # so we delete it and add the proper index expression.
+ yield soledad.delete_index(name)
+ yield soledad.create_index(name, *expression)
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..dfff3936
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Errors and exceptions used by the Key Manager.
+class KeyNotFound(Exception):
+ """
+ Raised when key was no found on keyserver.
+ """
+ pass
+class KeyVersionError(KeyNotFound):
+ """
+ Raised when key was found in the keyring but the version is not supported.
+ It will usually mean that it was created by a newer version of KeyManager.
+ """
+ pass
+class KeyAlreadyExists(Exception):
+ """
+ Raised when attempted to create a key that already exists.
+ """
+ pass
+class KeyAttributesDiffer(Exception):
+ """
+ Raised when trying to delete a key but the stored key differs from the key
+ passed to the delete_key() method.
+ """
+ pass
+class NoPasswordGiven(Exception):
+ """
+ Raised when trying to perform some action that needs a password without
+ providing one.
+ """
+ pass
+class InvalidSignature(Exception):
+ """
+ Raised when signature could not be verified.
+ """
+ pass
+class EncryptError(Exception):
+ """
+ Raised upon failures of encryption.
+ """
+ pass
+class DecryptError(Exception):
+ """
+ Raised upon failures of decryption.
+ """
+ pass
+class GPGError(Exception):
+ """
+ Raised upon failures of encryption/decryption.
+ """
+ pass
+class SignFailed(Exception):
+ """
+ Raised when failed to sign.
+ """
+ pass
+class KeyAddressMismatch(Exception):
+ """
+ A mismatch between addresses.
+ """
+class KeyFingerprintMismatch(Exception):
+ """
+ A mismatch between fingerprints.
+ """
+class KeyNotValidUpgrade(Exception):
+ """
+ Already existing key can not be upgraded with the new key
+ """
+class UnsupportedKeyTypeError(Exception):
+ """
+ Invalid key type
+ """
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..91ecf3ac
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013-2016 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Abstact key type and encryption scheme representations.
+import json
+import logging
+import re
+import time
+from datetime import datetime
+from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
+from leap.keymanager.validation import ValidationLevels
+from leap.keymanager import documents as doc
+logger = logging.getLogger(__name__)
+# Key handling utilities
+def is_address(address):
+ """
+ Return whether the given C{address} is in the form user@provider.
+ :param address: The address to be tested.
+ :type address: str
+ :return: Whether C{address} is in the form user@provider.
+ :rtype: bool
+ """
+ return bool(re.match('[\w.-]+@[\w.-]+', address))
+def build_key_from_dict(key, active=None):
+ """
+ Build an OpenPGPKey key based on info in C{kdict}.
+ :param key: Dictionary with key data.
+ :type key: dict
+ :param active: Dictionary with active data.
+ :type active: dict
+ :return: An instance of the key.
+ :rtype: C{kClass}
+ """
+ address = None
+ validation = ValidationLevels.Weak_Chain
+ last_audited_at = None
+ encr_used = False
+ sign_used = False
+ if active:
+ address = active[doc.KEY_ADDRESS_KEY]
+ try:
+ validation = ValidationLevels.get(active[doc.KEY_VALIDATION_KEY])
+ except ValueError:
+ logger.error("Not valid validation level (%s) for key %s",
+ (active[doc.KEY_VALIDATION_KEY],
+ active[doc.KEY_FINGERPRINT_KEY]))
+ last_audited_at = _to_datetime(active[doc.KEY_LAST_AUDITED_AT_KEY])
+ encr_used = active[doc.KEY_ENCR_USED_KEY]
+ sign_used = active[doc.KEY_SIGN_USED_KEY]
+ expiry_date = _to_datetime(key[doc.KEY_EXPIRY_DATE_KEY])
+ refreshed_at = _to_datetime(key[doc.KEY_REFRESHED_AT_KEY])
+ return OpenPGPKey(
+ address=address,
+ uids=key[doc.KEY_UIDS_KEY],
+ fingerprint=key[doc.KEY_FINGERPRINT_KEY],
+ key_data=key[doc.KEY_DATA_KEY],
+ private=key[doc.KEY_PRIVATE_KEY],
+ length=key[doc.KEY_LENGTH_KEY],
+ expiry_date=expiry_date,
+ last_audited_at=last_audited_at,
+ refreshed_at=refreshed_at,
+ validation=validation,
+ encr_used=encr_used,
+ sign_used=sign_used,
+ )
+def _to_datetime(unix_time):
+ if unix_time != 0:
+ return datetime.fromtimestamp(unix_time)
+ else:
+ return None
+def _to_unix_time(date):
+ if date is not None:
+ return int(time.mktime(date.timetuple()))
+ else:
+ return 0
+class OpenPGPKey(object):
+ """
+ Base class for OpenPGP keys.
+ """
+ __slots__ = ('address', 'uids', 'fingerprint', 'key_data',
+ 'private', 'length', 'expiry_date', 'validation',
+ 'last_audited_at', 'refreshed_at',
+ 'encr_used', 'sign_used', '_index', '_gpgbinary')
+ def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="",
+ key_data="", private=False, length=0, expiry_date=None,
+ validation=ValidationLevels.Weak_Chain, last_audited_at=None,
+ refreshed_at=None, encr_used=False, sign_used=False):
+ self._gpgbinary = gpgbinary
+ self.address = address
+ if not uids and address:
+ self.uids = [address]
+ else:
+ self.uids = uids
+ self.fingerprint = fingerprint
+ self.key_data = key_data
+ self.private = private
+ self.length = length
+ self.expiry_date = expiry_date
+ self.validation = validation
+ self.last_audited_at = last_audited_at
+ self.refreshed_at = refreshed_at
+ self.encr_used = encr_used
+ self.sign_used = sign_used
+ self._index = len(self.__slots__)
+ @property
+ def signatures(self):
+ """
+ Get the key signatures
+ :return: the key IDs that have signed the key
+ :rtype: list(str)
+ """
+ with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg:
+ res = gpg.list_sigs(self.fingerprint)
+ for uid, sigs in res.sigs.iteritems():
+ if parse_address(uid) in self.uids:
+ return sigs
+ return []
+ def merge(self, newkey):
+ if newkey.fingerprint != self.fingerprint:
+ logger.critical(
+ "Can't put a key whith the same key_id and different "
+ "fingerprint: %s, %s"
+ % (newkey.fingerprint, self.fingerprint))
+ raise errors.KeyFingerprintMismatch(newkey.fingerprint)
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpg.import_keys(self.key_data)
+ gpg.import_keys(newkey.key_data)
+ gpgkey = gpg.list_keys(secret=newkey.private).pop()
+ if gpgkey['expires']:
+ self.expiry_date = datetime.fromtimestamp(
+ int(gpgkey['expires']))
+ else:
+ self.expiry_date = None
+ self.uids = []
+ for uid in gpgkey['uids']:
+ self.uids.append(parse_address(uid))
+ self.length = int(gpgkey['length'])
+ self.key_data = gpg.export_keys(gpgkey['fingerprint'],
+ secret=self.private)
+ if newkey.validation > self.validation:
+ self.validation = newkey.validation
+ if newkey.last_audited_at > self.last_audited_at:
+ self.validation = newkey.last_audited_at
+ self.encr_used = newkey.encr_used or self.encr_used
+ self.sign_used = newkey.sign_used or self.sign_used
+ self.refreshed_at =
+ def get_json(self):
+ """
+ Return a JSON string describing this key.
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ expiry_date = _to_unix_time(self.expiry_date)
+ refreshed_at = _to_unix_time(self.refreshed_at)
+ return json.dumps({
+ doc.KEY_UIDS_KEY: self.uids,
+ doc.KEY_TYPE_KEY: self.__class__.__name__,
+ doc.KEY_FINGERPRINT_KEY: self.fingerprint,
+ doc.KEY_DATA_KEY: self.key_data,
+ doc.KEY_PRIVATE_KEY: self.private,
+ doc.KEY_LENGTH_KEY: self.length,
+ doc.KEY_EXPIRY_DATE_KEY: expiry_date,
+ doc.KEY_REFRESHED_AT_KEY: refreshed_at,
+ })
+ def get_active_json(self):
+ """
+ Return a JSON string describing this key.
+ :return: The JSON string describing this key.
+ :rtype: str
+ """
+ last_audited_at = _to_unix_time(self.last_audited_at)
+ return json.dumps({
+ doc.KEY_ADDRESS_KEY: self.address,
+ doc.KEY_TYPE_KEY: (self.__class__.__name__ +
+ doc.KEY_FINGERPRINT_KEY: self.fingerprint,
+ doc.KEY_PRIVATE_KEY: self.private,
+ doc.KEY_VALIDATION_KEY: str(self.validation),
+ doc.KEY_LAST_AUDITED_AT_KEY: last_audited_at,
+ doc.KEY_ENCR_USED_KEY: self.encr_used,
+ doc.KEY_SIGN_USED_KEY: self.sign_used,
+ })
+ def next(self):
+ if self._index == 0:
+ self._index = len(self.__slots__)
+ raise StopIteration
+ self._index -= 1
+ key = self.__slots__[self._index]
+ if key.startswith('_'):
+ return
+ value = getattr(self, key)
+ if key == "validation":
+ value = str(value)
+ elif key in ["expiry_date", "last_audited_at", "refreshed_at"]:
+ value = str(value)
+ return key, value
+ def __iter__(self):
+ return self
+ def __repr__(self):
+ """
+ Representation of this class
+ """
+ return u"<%s 0x%s (%s - %s)>" % (
+ self.__class__.__name__,
+ self.fingerprint,
+ self.address,
+ "priv" if self.private else "publ")
+def parse_address(address):
+ """
+ Remove name, '<', '>' and the identity suffix after the '+' until the '@'
+ e.g.: becomes
+ since the key belongs to the identity without the '+' suffix.
+ :type address: str
+ :rtype: str
+ """
+ mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?'
+ match = re.match(mail_regex, address)
+ if match is None:
+ return None
+ return ''.join(, 4))
diff --git a/keymanager/src/leap/keymanager/ b/keymanager/src/leap/keymanager/
new file mode 100644
index 00000000..c73da2ee
--- /dev/null
+++ b/keymanager/src/leap/keymanager/
@@ -0,0 +1,167 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2015 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Document migrator
+# XXX: versioning has being added 12/2015 when keymanager was not
+# much in use in the wild. We can probably drop support for
+# keys without version at some point.
+from collections import namedtuple
+from twisted.internet.defer import gatherResults, succeed
+from leap.keymanager import documents as doc
+from leap.keymanager.validation import ValidationLevels
+KEY_ID_KEY = 'key_id'
+KeyDocs = namedtuple("KeyDocs", ['key', 'active'])
+class KeyDocumentsMigrator(object):
+ """
+ Migrate old KeyManager Soledad Documents to the newest schema
+ """
+ def __init__(self, soledad):
+ self._soledad = soledad
+ def migrate(self):
+ deferred_public = self._get_docs(private=False)
+ deferred_public.addCallback(self._migrate_docs)
+ deferred_private = self._get_docs(private=True)
+ deferred_private.addCallback(self._migrate_docs)
+ return gatherResults([deferred_public, deferred_private])
+ def _get_docs(self, private=False):
+ private_value = '1' if private else '0'
+ deferred_keys = self._soledad.get_from_index(
+ private_value)
+ deferred_active = self._soledad.get_from_index(
+ private_value)
+ return gatherResults([deferred_keys, deferred_active])
+ def _migrate_docs(self, (key_docs, active_docs)):
+ def update_keys(keys):
+ deferreds = []
+ for key_id in keys:
+ key = keys[key_id].key
+ actives = keys[key_id].active
+ d = self._migrate_actives(key, actives)
+ deferreds.append(d)
+ d = self._migrate_key(key)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ d = self._buildKeyDict(key_docs, active_docs)
+ d.addCallback(lambda keydict: self._filter_outdated(keydict))
+ d.addCallback(update_keys)
+ def _buildKeyDict(self, keys, actives):
+ keydict = {
+ fp2id(key.content[doc.KEY_FINGERPRINT_KEY]): KeyDocs(key, [])
+ for key in keys}
+ deferreds = []
+ for active in actives:
+ if KEY_ID_KEY in active.content:
+ key_id = active.content[KEY_ID_KEY]
+ if key_id not in keydict:
+ d = self._soledad.delete_doc(active)
+ deferreds.append(d)
+ continue
+ keydict[key_id].active.append(active)
+ d = gatherResults(deferreds)
+ d.addCallback(lambda _: keydict)
+ return d
+ def _filter_outdated(self, keydict):
+ outdated = {}
+ for key_id, docs in keydict.items():
+ if ((docs.key and doc.KEY_VERSION_KEY not in docs.key.content) or
+ outdated[key_id] = docs
+ return outdated
+ def _migrate_actives(self, key, actives):
+ if not key:
+ deferreds = []
+ for active in actives:
+ d = self._soledad.delete_doc(active)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ validation = str(ValidationLevels.Weak_Chain)
+ last_audited = 0
+ encr_used = False
+ sign_used = False
+ fingerprint = key.content[doc.KEY_FINGERPRINT_KEY]
+ if len(actives) == 1 and doc.KEY_VERSION_KEY not in key.content:
+ # we can preserve the validation of the key if there is only one
+ # active address for the key
+ validation = key.content[doc.KEY_VALIDATION_KEY]
+ last_audited = key.content[doc.KEY_LAST_AUDITED_AT_KEY]
+ encr_used = key.content[doc.KEY_ENCR_USED_KEY]
+ sign_used = key.content[doc.KEY_SIGN_USED_KEY]
+ deferreds = []
+ for active in actives:
+ if doc.KEY_VERSION_KEY in active.content:
+ continue
+ active.content[doc.KEY_VERSION_KEY] = doc.KEYMANAGER_DOC_VERSION
+ active.content[doc.KEY_FINGERPRINT_KEY] = fingerprint
+ active.content[doc.KEY_VALIDATION_KEY] = validation
+ active.content[doc.KEY_LAST_AUDITED_AT_KEY] = last_audited
+ active.content[doc.KEY_ENCR_USED_KEY] = encr_used
+ active.content[doc.KEY_SIGN_USED_KEY] = sign_used
+ del active.content[KEY_ID_KEY]
+ d = self._soledad.put_doc(active)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ def _migrate_key(self, key):
+ if not key or doc.KEY_VERSION_KEY in key.content:
+ return succeed(None)
+ key.content[doc.KEY_UIDS_KEY] = key.content[doc.KEY_ADDRESS_KEY]
+ del key.content[doc.KEY_ADDRESS_KEY]
+ del key.content[KEY_ID_KEY]
+ del key.content[doc.KEY_VALIDATION_KEY]
+ del key.content[doc.KEY_LAST_AUDITED_AT_KEY]
+ del key.content[doc.KEY_ENCR_USED_KEY]
+ del key.content[doc.KEY_SIGN_USED_KEY]
+ return self._soledad.put_doc(key)
+def fp2id(fingerprint):
+ return fingerprint[-KEY_ID_LENGTH:]
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013-2016 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Infrastructure for using OpenPGP keys in Key Manager.
+import logging
+import os
+import re
+import tempfile
+import traceback
+import io
+from datetime import datetime
+from multiprocessing import cpu_count
+from gnupg.gnupg import GPGUtilities
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+from leap.common.check import leap_assert, leap_assert_type, leap_check
+from leap.keymanager import errors
+from leap.keymanager.wrapper import TempGPGWrapper
+from leap.keymanager.keys import (
+ OpenPGPKey,
+ is_address,
+ parse_address,
+ build_key_from_dict,
+from leap.keymanager.documents import (
+ init_indexes,
+logger = logging.getLogger(__name__)
+# A temporary GPG keyring wrapped to provide OpenPGP functionality.
+# This function will be used to call blocking GPG functions outside
+# of Twisted reactor and match the concurrent calls to the amount of CPU cores
+cpu_core_semaphore = defer.DeferredSemaphore(cpu_count())
+def from_thread(func, *args, **kwargs):
+ call = lambda: deferToThread(func, *args, **kwargs)
+ return
+# The OpenPGP wrapper
+class OpenPGPScheme(object):
+ """
+ A wrapper for OpenPGP keys management and use (encryption, decyption,
+ signing and verification).
+ """
+ # type used on the soledad documents
+ KEY_TYPE = OpenPGPKey.__name__
+ def __init__(self, soledad, gpgbinary=None):
+ """
+ Initialize the OpenPGP wrapper.
+ :param soledad: A Soledad instance for key storage.
+ :type soledad: leap.soledad.Soledad
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+ """
+ self._soledad = soledad
+ self._gpgbinary = gpgbinary
+ self.deferred_init = init_indexes(soledad)
+ self.deferred_init.addCallback(self._migrate_documents_schema)
+ self._wait_indexes("get_key", "put_key", "get_all_keys")
+ def _migrate_documents_schema(self, _):
+ from leap.keymanager.migrator import KeyDocumentsMigrator
+ migrator = KeyDocumentsMigrator(self._soledad)
+ return migrator.migrate()
+ def _wait_indexes(self, *methods):
+ """
+ Methods that need to wait for the indexes to be ready.
+ Heavily based on
+ :param methods: methods that need to wait for the indexes to be ready
+ :type methods: tuple(str)
+ """
+ self.waiting = []
+ self.stored = {}
+ def restore(_):
+ for method in self.stored:
+ setattr(self, method, self.stored[method])
+ for d in self.waiting:
+ d.callback(None)
+ def makeWrapper(method):
+ def wrapper(*args, **kw):
+ d = defer.Deferred()
+ d.addCallback(lambda _: self.stored[method](*args, **kw))
+ self.waiting.append(d)
+ return d
+ return wrapper
+ for method in methods:
+ self.stored[method] = getattr(self, method)
+ setattr(self, method, makeWrapper(method))
+ self.deferred_init.addCallback(restore)
+ #
+ # Keys management
+ #
+ def gen_key(self, address):
+ """
+ Generate an OpenPGP keypair bound to C{address}.
+ :param address: The address bound to the key.
+ :type address: str
+ :return: A Deferred which fires with the key bound to address, or fails
+ with KeyAlreadyExists if key already exists in local database.
+ :rtype: Deferred
+ """
+ # make sure the key does not already exist
+ leap_assert(is_address(address), 'Not an user address: %s' % address)
+ @defer.inlineCallbacks
+ def _gen_key(_):
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ # TODO: inspect result, or use decorator
+ params = gpg.gen_key_input(
+ key_type='RSA',
+ key_length=4096,
+ name_real=address,
+ name_email=address,
+ name_comment='')
+"About to generate keys... "
+ "This might take SOME time.")
+ yield from_thread(gpg.gen_key, params)
+"Keys for %s have been successfully "
+ "generated." % (address,))
+ pubkeys = gpg.list_keys()
+ # assert for new key characteristics
+ leap_assert(
+ len(pubkeys) is 1, # a unitary keyring!
+ 'Keyring has wrong number of keys: %d.' % len(pubkeys))
+ key = gpg.list_keys(secret=True).pop()
+ leap_assert(
+ len(key['uids']) is 1, # with just one uid!
+ 'Wrong number of uids for key: %d.' % len(key['uids']))
+ uid_match = False
+ for uid in key['uids']:
+ if re.match('.*<%s>$' % address, uid) is not None:
+ uid_match = True
+ break
+ leap_assert(uid_match, 'Key not correctly bound to address.')
+ # insert both public and private keys in storage
+ deferreds = []
+ for secret in [True, False]:
+ key = gpg.list_keys(secret=secret).pop()
+ openpgp_key = self._build_key_from_gpg(
+ key,
+ gpg.export_keys(key['fingerprint'], secret=secret),
+ address)
+ d = self.put_key(openpgp_key)
+ deferreds.append(d)
+ yield defer.gatherResults(deferreds)
+ def key_already_exists(_):
+ raise errors.KeyAlreadyExists(address)
+ d = self.get_key(address)
+ d.addCallbacks(key_already_exists, _gen_key)
+ d.addCallback(lambda _: self.get_key(address, private=True))
+ return d
+ def get_key(self, address, private=False):
+ """
+ Get key bound to C{address} from local storage.
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Look for a private key instead of a public one?
+ :type private: bool
+ :return: A Deferred which fires with the OpenPGPKey bound to address,
+ or which fails with KeyNotFound if the key was not found on
+ local storage.
+ :rtype: Deferred
+ """
+ address = parse_address(address)
+ def build_key((keydoc, activedoc)):
+ if keydoc is None:
+ raise errors.KeyNotFound(address)
+ leap_assert(
+ address in keydoc.content[KEY_UIDS_KEY],
+ 'Wrong address in key %s. Expected %s, found %s.'
+ % (keydoc.content[KEY_FINGERPRINT_KEY], address,
+ keydoc.content[KEY_UIDS_KEY]))
+ key = build_key_from_dict(keydoc.content, activedoc.content)
+ key._gpgbinary = self._gpgbinary
+ return key
+ d = self._get_key_doc(address, private)
+ d.addCallback(build_key)
+ return d
+ @defer.inlineCallbacks
+ def get_all_keys(self, private=False):
+ """
+ Return all keys stored in local database.
+ :param private: Include private keys
+ :type private: bool
+ :return: A Deferred which fires with a list of all keys in local db.
+ :rtype: Deferred
+ """
+ HAS_ACTIVE = "has_active"
+ active_docs = yield self._soledad.get_from_index(
+ '1' if private else '0')
+ key_docs = yield self._soledad.get_from_index(
+ '1' if private else '0')
+ keys = []
+ fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY]
+ for active in active_docs:
+ fp_keys = filter(lambda k: fp(k) == fp(active), key_docs)
+ if len(fp_keys) == 0:
+ yield self._soledad.delete_doc(active)
+ continue
+ elif len(fp_keys) == 1:
+ key = fp_keys[0]
+ else:
+ key = yield self._repair_key_docs(fp_keys)
+ key.content[HAS_ACTIVE] = True
+ keys.append(build_key_from_dict(key.content, active.content))
+ unactive_keys = filter(lambda k: HAS_ACTIVE not in k.content, key_docs)
+ keys += map(lambda k: build_key_from_dict(k.content), unactive_keys)
+ defer.returnValue(keys)
+ def parse_key(self, key_data, address=None):
+ """
+ Parses a key (or key pair) data and returns
+ the OpenPGPKey keys.
+ :param key_data: the key data to be parsed.
+ :type key_data: str or unicode
+ :param address: Active address for the key.
+ :type address: str
+ :returns: the public key and private key (if applies) for that data.
+ :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey)
+ the tuple may have one or both components None
+ """
+ leap_assert_type(key_data, (str, unicode))
+ # TODO: add more checks for correct key data.
+ leap_assert(key_data is not None, 'Data does not represent a key.')
+ priv_info, privkey = process_key(
+ key_data, self._gpgbinary, secret=True)
+ pub_info, pubkey = process_key(
+ key_data, self._gpgbinary, secret=False)
+ if not pubkey:
+ return (None, None)
+ openpgp_privkey = None
+ if privkey:
+ # build private key
+ openpgp_privkey = self._build_key_from_gpg(priv_info, privkey,
+ address)
+ leap_check(pub_info['fingerprint'] == priv_info['fingerprint'],
+ 'Fingerprints for public and private key differ.',
+ errors.KeyFingerprintMismatch)
+ # build public key
+ openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey, address)
+ return (openpgp_pubkey, openpgp_privkey)
+ def put_raw_key(self, key_data, address):
+ """
+ Put key contained in C{key_data} in local storage.
+ :param key_data: The key data to be stored.
+ :type key_data: str or unicode
+ :param address: address for which this key will be active
+ :type address: str
+ :return: A Deferred which fires when the OpenPGPKey is in the storage.
+ :rtype: Deferred
+ """
+ leap_assert_type(key_data, (str, unicode))
+ openpgp_privkey = None
+ try:
+ openpgp_pubkey, openpgp_privkey = self.parse_key(
+ key_data, address)
+ except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e:
+ return
+ def put_key(_, key):
+ return self.put_key(key)
+ d = defer.succeed(None)
+ if openpgp_pubkey is not None:
+ d.addCallback(put_key, openpgp_pubkey)
+ if openpgp_privkey is not None:
+ d.addCallback(put_key, openpgp_privkey)
+ return d
+ def put_key(self, key):
+ """
+ Put C{key} in local storage.
+ :param key: The key to be stored.
+ :type key: OpenPGPKey
+ :return: A Deferred which fires when the key is in the storage.
+ :rtype: Deferred
+ """
+ def merge_and_put((keydoc, activedoc)):
+ if not keydoc:
+ return put_new_key(activedoc)
+ active_content = None
+ if activedoc:
+ active_content = activedoc.content
+ oldkey = build_key_from_dict(keydoc.content, active_content)
+ key.merge(oldkey)
+ keydoc.set_json(key.get_json())
+ d = self._soledad.put_doc(keydoc)
+ d.addCallback(put_active, activedoc)
+ return d
+ def put_new_key(activedoc):
+ deferreds = []
+ if activedoc:
+ d = self._soledad.delete_doc(activedoc)
+ deferreds.append(d)
+ for json in [key.get_json(), key.get_active_json()]:
+ d = self._soledad.create_doc_from_json(json)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+ def put_active(_, activedoc):
+ active_json = key.get_active_json()
+ if activedoc:
+ activedoc.set_json(active_json)
+ d = self._soledad.put_doc(activedoc)
+ else:
+ d = self._soledad.create_doc_from_json(active_json)
+ return d
+ def get_active_doc(keydoc):
+ d = self._get_active_doc_from_address(key.address, key.private)
+ d.addCallback(lambda activedoc: (keydoc, activedoc))
+ return d
+ d = self._get_key_doc_from_fingerprint(key.fingerprint, key.private)
+ d.addCallback(get_active_doc)
+ d.addCallback(merge_and_put)
+ return d
+ def _get_key_doc(self, address, private=False):
+ """
+ Get the document with a key (public, by default) bound to C{address}.
+ If C{private} is True, looks for a private key instead of a public.
+ :param address: The address bound to the key.
+ :type address: str
+ :param private: Whether to look for a private key.
+ :type private: bool
+ :return: A Deferred which fires with a touple of two SoledadDocument
+ (keydoc, activedoc) or None if it does not exist.
+ :rtype: Deferred
+ """
+ def get_key_from_active_doc(activedoc):
+ if not activedoc:
+ return (None, None)
+ fingerprint = activedoc.content[KEY_FINGERPRINT_KEY]
+ d = self._get_key_doc_from_fingerprint(fingerprint, private)
+ d.addCallback(delete_active_if_no_key, activedoc)
+ return d
+ def delete_active_if_no_key(keydoc, activedoc):
+ if not keydoc:
+ d = self._soledad.delete_doc(activedoc)
+ d.addCallback(lambda _: (None, None))
+ return d
+ return (keydoc, activedoc)
+ d = self._get_active_doc_from_address(address, private)
+ d.addCallback(get_key_from_active_doc)
+ return d
+ def _build_key_from_gpg(self, key, key_data, address=None):
+ """
+ Build an OpenPGPKey for C{address} based on C{key} from
+ local gpg storage.
+ GPG key data has to be queried independently in this
+ wrapper, so we receive it in C{key_data}.
+ :param address: Active address for the key.
+ :type address: str
+ :param key: Key obtained from GPG storage.
+ :type key: dict
+ :param key_data: Key data obtained from GPG storage.
+ :type key_data: str
+ :return: An instance of the key.
+ :rtype: OpenPGPKey
+ """
+ return build_gpg_key(key, key_data, address, self._gpgbinary)
+ def delete_key(self, key):
+ """
+ Remove C{key} from storage.
+ :param key: The key to be removed.
+ :type key: EncryptionKey
+ :return: A Deferred which fires when the key is deleted, or which
+ fails with KeyNotFound if the key was not found on local
+ storage.
+ :rtype: Deferred
+ """
+ leap_assert_type(key, OpenPGPKey)
+ def delete_docs(activedocs):
+ deferreds = []
+ for doc in activedocs:
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ return defer.gatherResults(deferreds)
+ def get_key_docs(_):
+ return self._soledad.get_from_index(
+ self.KEY_TYPE,
+ key.fingerprint,
+ '1' if key.private else '0')
+ def delete_key(docs):
+ if len(docs) == 0:
+ raise errors.KeyNotFound(key)
+ elif len(docs) > 1:
+ logger.warning("There is more than one key for fingerprint %s"
+ % key.fingerprint)
+ has_deleted = False
+ deferreds = []
+ for doc in docs:
+ if doc.content['fingerprint'] == key.fingerprint:
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ has_deleted = True
+ if not has_deleted:
+ raise errors.KeyNotFound(key)
+ return defer.gatherResults(deferreds)
+ d = self._soledad.get_from_index(
+ key.fingerprint,
+ '1' if key.private else '0')
+ d.addCallback(delete_docs)
+ d.addCallback(get_key_docs)
+ d.addCallback(delete_key)
+ return d
+ #
+ # Data encryption, decryption, signing and verifying
+ #
+ @staticmethod
+ def _assert_gpg_result_ok(result):
+ """
+ Check if GPG result is 'ok' and log stderr outputs.
+ :param result: GPG results, which have a field calld 'ok' that states
+ whether the gpg operation was successful or not.
+ :type result: object
+ :raise GPGError: Raised when the gpg operation was not successful.
+ """
+ stderr = getattr(result, 'stderr', None)
+ if stderr:
+ logger.debug("%s" % (stderr,))
+ if getattr(result, 'ok', None) is not True:
+ raise errors.GPGError(
+ 'Failed to encrypt/decrypt: %s' % stderr)
+ @defer.inlineCallbacks
+ def encrypt(self, data, pubkey, passphrase=None, sign=None,
+ cipher_algo='AES256'):
+ """
+ Encrypt C{data} using public @{pubkey} and sign with C{sign} key.
+ :param data: The data to be encrypted.
+ :type data: str
+ :param pubkey: The key used to encrypt.
+ :type pubkey: OpenPGPKey
+ :param sign: The key used for signing.
+ :type sign: OpenPGPKey
+ :param cipher_algo: The cipher algorithm to use.
+ :type cipher_algo: str
+ :return: A Deferred that will be fired with the encrypted data.
+ :rtype: defer.Deferred
+ :raise EncryptError: Raised if failed encrypting for some reason.
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False, 'Key is not public.')
+ keys = [pubkey]
+ if sign is not None:
+ leap_assert_type(sign, OpenPGPKey)
+ leap_assert(sign.private is True)
+ keys.append(sign)
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
+ result = yield from_thread(
+ gpg.encrypt,
+ data, pubkey.fingerprint,
+ default_key=sign.fingerprint if sign else None,
+ passphrase=passphrase, symmetric=False,
+ cipher_algo=cipher_algo)
+ # Here we cannot assert for correctness of sig because the sig is
+ # in the ciphertext.
+ # result.ok - (bool) indicates if the operation succeeded
+ # - (bool) contains the result of the operation
+ try:
+ self._assert_gpg_result_ok(result)
+ defer.returnValue(
+ except errors.GPGError as e:
+ logger.warning('Failed to encrypt: %s.' % str(e))
+ raise errors.EncryptError()
+ @defer.inlineCallbacks
+ def decrypt(self, data, privkey, passphrase=None, verify=None):
+ """
+ Decrypt C{data} using private @{privkey} and verify with C{verify} key.
+ :param data: The data to be decrypted.
+ :type data: str
+ :param privkey: The key used to decrypt.
+ :type privkey: OpenPGPKey
+ :param passphrase: The passphrase for the secret key used for
+ decryption.
+ :type passphrase: str
+ :param verify: The key used to verify a signature.
+ :type verify: OpenPGPKey
+ :return: Deferred that will fire with the decrypted data and
+ if signature verifies (unicode, bool)
+ :rtype: Deferred
+ :raise DecryptError: Raised if failed decrypting for some reason.
+ """
+ leap_assert(privkey.private is True, 'Key is not private.')
+ keys = [privkey]
+ if verify is not None:
+ leap_assert_type(verify, OpenPGPKey)
+ leap_assert(verify.private is False)
+ keys.append(verify)
+ with TempGPGWrapper(keys, self._gpgbinary) as gpg:
+ try:
+ result = yield from_thread(gpg.decrypt,
+ data, passphrase=passphrase,
+ always_trust=True)
+ self._assert_gpg_result_ok(result)
+ # verify signature
+ sign_valid = False
+ if (verify is not None and
+ result.valid is True and
+ verify.fingerprint == result.pubkey_fingerprint):
+ sign_valid = True
+ defer.returnValue((, sign_valid))
+ except errors.GPGError as e:
+ logger.warning('Failed to decrypt: %s.' % str(e))
+ raise errors.DecryptError(str(e))
+ def is_encrypted(self, data):
+ """
+ Return whether C{data} was asymmetrically encrypted using OpenPGP.
+ :param data: The data we want to know about.
+ :type data: str
+ :return: Whether C{data} was encrypted using this wrapper.
+ :rtype: bool
+ """
+ with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg:
+ gpgutil = GPGUtilities(gpg)
+ return gpgutil.is_encrypted_asym(data)
+ def sign(self, data, privkey, digest_algo='SHA512', clearsign=False,
+ detach=True, binary=False):
+ """
+ Sign C{data} with C{privkey}.
+ :param data: The data to be signed.
+ :type data: str
+ :param privkey: The private key to be used to sign.
+ :type privkey: OpenPGPKey
+ :param digest_algo: The hash digest to use.
+ :type digest_algo: str
+ :param clearsign: If True, create a cleartext signature.
+ :type clearsign: bool
+ :param detach: If True, create a detached signature.
+ :type detach: bool
+ :param binary: If True, do not ascii armour the output.
+ :type binary: bool
+ :return: The ascii-armored signed data.
+ :rtype: str
+ """
+ leap_assert_type(privkey, OpenPGPKey)
+ leap_assert(privkey.private is True)
+ # result.fingerprint - contains the fingerprint of the key used to
+ # sign.
+ with TempGPGWrapper(privkey, self._gpgbinary) as gpg:
+ result = gpg.sign(data, default_key=privkey.fingerprint,
+ digest_algo=digest_algo, clearsign=clearsign,
+ detach=detach, binary=binary)
+ rfprint = privkey.fingerprint
+ privkey = gpg.list_keys(secret=True).pop()
+ kfprint = privkey['fingerprint']
+ if result.fingerprint is None:
+ raise errors.SignFailed(
+ 'Failed to sign with key %s: %s' %
+ (privkey['fingerprint'], result.stderr))
+ leap_assert(
+ result.fingerprint == kfprint,
+ 'Signature and private key fingerprints mismatch: '
+ '%s != %s' % (rfprint, kfprint))
+ return
+ def verify(self, data, pubkey, detached_sig=None):
+ """
+ Verify signed C{data} with C{pubkey}, eventually using
+ C{detached_sig}.
+ :param data: The data to be verified.
+ :type data: str
+ :param pubkey: The public key to be used on verification.
+ :type pubkey: OpenPGPKey
+ :param detached_sig: A detached signature. If given, C{data} is
+ verified against this detached signature.
+ :type detached_sig: str
+ :return: signature matches
+ :rtype: bool
+ """
+ leap_assert_type(pubkey, OpenPGPKey)
+ leap_assert(pubkey.private is False)
+ with TempGPGWrapper(pubkey, self._gpgbinary) as gpg:
+ result = None
+ if detached_sig is None:
+ result = gpg.verify(data)
+ else:
+ # to verify using a detached sig we have to use
+ # gpg.verify_file(), which receives the data as a binary
+ # stream and the name of a file containing the signature.
+ sf, sfname = tempfile.mkstemp()
+ with os.fdopen(sf, 'w') as sfd:
+ sfd.write(detached_sig)
+ result = gpg.verify_file(io.BytesIO(data), sig_file=sfname)
+ os.unlink(sfname)
+ gpgpubkey = gpg.list_keys().pop()
+ valid = result.valid
+ rfprint = result.fingerprint
+ kfprint = gpgpubkey['fingerprint']
+ return valid and rfprint == kfprint
+ def _get_active_doc_from_address(self, address, private):
+ d = self._soledad.get_from_index(
+ address,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_active_docs)
+ d.addCallback(self._check_version)
+ return d
+ def _get_key_doc_from_fingerprint(self, fingerprint, private):
+ d = self._soledad.get_from_index(
+ self.KEY_TYPE,
+ fingerprint,
+ '1' if private else '0')
+ d.addCallback(self._repair_and_get_doc, self._repair_key_docs)
+ d.addCallback(self._check_version)
+ return d
+ def _repair_and_get_doc(self, doclist, repair_func):
+ if len(doclist) is 0:
+ return None
+ elif len(doclist) > 1:
+ return repair_func(doclist)
+ return doclist[0]
+ def _check_version(self, doc):
+ if doc is not None:
+ version = doc.content[KEY_VERSION_KEY]
+ raise errors.KeyVersionError(str(version))
+ return doc
+ def _repair_key_docs(self, doclist):
+ """
+ If there is more than one key for a key id try to self-repair it
+ :return: a Deferred that will be fired with the valid key doc once all
+ the deletions are completed
+ :rtype: Deferred
+ """
+ def log_key_doc(doc):
+ logger.error("\t%s: %s" % (doc.content[KEY_UIDS_KEY],
+ doc.content[KEY_FINGERPRINT_KEY]))
+ def cmp_key(d1, d2):
+ return cmp(d1.content[KEY_REFRESHED_AT_KEY],
+ d2.content[KEY_REFRESHED_AT_KEY])
+ return self._repair_docs(doclist, cmp_key, log_key_doc)
+ @defer.inlineCallbacks
+ def _repair_active_docs(self, doclist):
+ """
+ If there is more than one active doc for an address try to self-repair
+ it
+ :return: a Deferred that will be fired with the valid active doc once
+ all the deletions are completed
+ :rtype: Deferred
+ """
+ keys = {}
+ for doc in doclist:
+ fp = doc.content[KEY_FINGERPRINT_KEY]
+ private = doc.content[KEY_PRIVATE_KEY]
+ try:
+ key = yield self._get_key_doc_from_fingerprint(fp, private)
+ keys[fp] = key
+ except Exception:
+ pass
+ def log_active_doc(doc):
+ logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY],
+ doc.content[KEY_FINGERPRINT_KEY]))
+ def cmp_active(d1, d2):
+ # XXX: for private keys it will be nice to check which key is known
+ # by the nicknym server and keep this one. But this needs a
+ # refactor that might not be worth it.
+ used1 = (d1.content[KEY_SIGN_USED_KEY] +
+ d1.content[KEY_ENCR_USED_KEY])
+ used2 = (d2.content[KEY_SIGN_USED_KEY] +
+ d2.content[KEY_ENCR_USED_KEY])
+ res = cmp(used1, used2)
+ if res != 0:
+ return res
+ key1 = keys[d1.content[KEY_FINGERPRINT_KEY]]
+ key2 = keys[d2.content[KEY_FINGERPRINT_KEY]]
+ return cmp(key1.content[KEY_REFRESHED_AT_KEY],
+ key2.content[KEY_REFRESHED_AT_KEY])
+ doc = yield self._repair_docs(doclist, cmp_active, log_active_doc)
+ defer.returnValue(doc)
+ def _repair_docs(self, doclist, cmp_func, log_func):
+ logger.error("BUG ---------------------------------------------------")
+ logger.error("There is more than one doc of type %s:"
+ % (doclist[0].content[KEY_TYPE_KEY],))
+ doclist.sort(cmp=cmp_func, reverse=True)
+ log_func(doclist[0])
+ deferreds = []
+ for doc in doclist[1:]:
+ log_func(doc)
+ d = self._soledad.delete_doc(doc)
+ deferreds.append(d)
+ logger.error("")
+ logger.error(traceback.extract_stack())
+ logger.error("BUG (please report above info) ------------------------")
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ d.addCallback(lambda _: doclist[0])
+ return d
+def process_key(key_data, gpgbinary, secret=False):
+ with TempGPGWrapper(gpgbinary=gpgbinary) as gpg:
+ try:
+ gpg.import_keys(key_data)
+ info = gpg.list_keys(secret=secret).pop()
+ key = gpg.export_keys(info['fingerprint'], secret=secret)
+ except IndexError:
+ info = {}
+ key = None
+ return info, key
+def build_gpg_key(key_info, key_data, address=None, gpgbinary=None):
+ expiry_date = None
+ if key_info['expires']:
+ expiry_date = datetime.fromtimestamp(int(key_info['expires']))
+ uids = []
+ for uid in key_info['uids']:
+ uids.append(parse_address(uid))
+ if address and address not in uids:
+ raise errors.KeyAddressMismatch("UIDs %s found, but expected %s"
+ % (str(uids), address))
+ return OpenPGPKey(
+ address=address,
+ uids=uids,
+ gpgbinary=gpgbinary,
+ fingerprint=key_info['fingerprint'],
+ key_data=key_data,
+ private=True if key_info['type'] == 'sec' else False,
+ length=int(key_info['length']),
+ expiry_date=expiry_date,
+# -*- coding: utf-8 -*-
+# Copyright (C) 2014 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Validation levels implementation for key managment.
+from datetime import datetime
+class ValidationLevel(object):
+ """
+ A validation level
+ Meant to be used to compare levels or get its string representation.
+ """
+ def __init__(self, name, value):
+ = name
+ self.value = value
+ def __cmp__(self, other):
+ return cmp(self.value, other.value)
+ def __str__(self):
+ return
+ def __repr__(self):
+ return "<ValidationLevel: %s (%d)>" % (, self.value)
+class _ValidationLevels(object):
+ """
+ Handler class to manage validation levels. It should have only one global
+ instance 'ValidationLevels'.
+ The levels are attributes of the instance and can be used like:
+ ValidationLevels.Weak_Chain
+ ValidationLevels.get("Weak_Chain")
+ """
+ _level_names = ("Weak_Chain",
+ "Provider_Trust",
+ "Provider_Endorsement",
+ "Third_Party_Endorsement",
+ "Third_Party_Consensus",
+ "Historically_Auditing",
+ "Known_Key",
+ "Fingerprint")
+ def __init__(self):
+ for name in self._level_names:
+ setattr(self, name,
+ ValidationLevel(name, self._level_names.index(name)))
+ def get(self, name):
+ """
+ Get the ValidationLevel of a name
+ :param name: name of the level
+ :type name: str
+ :rtype: ValidationLevel
+ """
+ return getattr(self, name)
+ def __iter__(self):
+ return iter(self._level_names)
+ValidationLevels = _ValidationLevels()
+def can_upgrade(new_key, old_key):
+ """
+ :type new_key: EncryptionKey
+ :type old_key: EncryptionKey
+ :rtype: bool
+ """
+ # First contact
+ if old_key is None:
+ return True
+ # An update of the same key
+ if new_key.fingerprint == old_key.fingerprint:
+ return True
+ # Manually verified fingerprint
+ if new_key.validation == ValidationLevels.Fingerprint:
+ return True
+ # Expired key and higher validation level
+ if (old_key.expiry_date is not None and
+ old_key.expiry_date < and
+ new_key.validation >= old_key.validation):
+ return True
+ # No expiration date and higher validation level
+ if (old_key.expiry_date is None and
+ new_key.validation > old_key.validation):
+ return True
+ # Not successfully used and strict high validation level
+ if (not (old_key.sign_used and old_key.encr_used) and
+ new_key.validation > old_key.validation):
+ return True
+ # New key signed by the old key
+ # XXX: signatures are using key-ids instead of fingerprints
+ key_id = old_key.fingerprint[-16:]
+ if key_id in new_key.signatures:
+ return True
+ return False
+# -*- coding: utf-8 -*-
+# Copyright (C) 2016 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+GPG wrapper for temporary keyrings
+import os
+import shutil
+import tempfile
+from gnupg import GPG
+from leap.common.check import leap_assert
+class TempGPGWrapper(object):
+ """
+ A context manager that wraps a temporary GPG keyring which only contains
+ the keys given at object creation.
+ """
+ def __init__(self, keys=None, gpgbinary=None):
+ """
+ Create an empty temporary keyring and import any given C{keys} into
+ it.
+ :param keys: OpenPGP key, or list of.
+ :type keys: OpenPGPKey or list of OpenPGPKeys
+ :param gpgbinary: Name for GnuPG binary executable.
+ :type gpgbinary: C{str}
+ """
+ self._gpg = None
+ self._gpgbinary = gpgbinary
+ if not keys:
+ keys = list()
+ if not isinstance(keys, list):
+ keys = [keys]
+ self._keys = keys
+ def __enter__(self):
+ """
+ Build and return a GPG keyring containing the keys given on
+ object creation.
+ :return: A GPG instance containing the keys given on object creation.
+ :rtype: gnupg.GPG
+ """
+ self._build_keyring()
+ return self._gpg
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ Ensure the gpg is properly destroyed.
+ """
+ # TODO handle exceptions and log here
+ self._destroy_keyring()
+ def _build_keyring(self):
+ """
+ Create a GPG keyring containing the keys given on object creation.
+ :return: A GPG instance containing the keys given on object creation.
+ :rtype: gnupg.GPG
+ """
+ privkeys = [key for key in self._keys if key and key.private is True]
+ publkeys = [key for key in self._keys if key and key.private is False]
+ # here we filter out public keys that have a correspondent
+ # private key in the list because the private key_data by
+ # itself is enough to also have the public key in the keyring,
+ # and we want to count the keys afterwards.
+ privfps = map(lambda privkey: privkey.fingerprint, privkeys)
+ publkeys = filter(
+ lambda pubkey: pubkey.fingerprint not in privfps, publkeys)
+ listkeys = lambda: self._gpg.list_keys()
+ listsecretkeys = lambda: self._gpg.list_keys(secret=True)
+ self._gpg = GPG(binary=self._gpgbinary,
+ homedir=tempfile.mkdtemp())
+ leap_assert(len(listkeys()) is 0, 'Keyring not empty.')
+ # import keys into the keyring:
+ # concatenating ascii-armored keys, which is correctly
+ # understood by GPG.
+ self._gpg.import_keys("".join(
+ [x.key_data for x in publkeys + privkeys]))
+ # assert the number of keys in the keyring
+ leap_assert(
+ len(listkeys()) == len(publkeys) + len(privkeys),
+ 'Wrong number of public keys in keyring: %d, should be %d)' %
+ (len(listkeys()), len(publkeys) + len(privkeys)))
+ leap_assert(
+ len(listsecretkeys()) == len(privkeys),
+ 'Wrong number of private keys in keyring: %d, should be %d)' %
+ (len(listsecretkeys()), len(privkeys)))
+ def _destroy_keyring(self):
+ """
+ Securely erase the keyring.
+ """
+ # TODO: implement some kind of wiping of data or a more
+ # secure way that
+ # does not write to disk.
+ try:
+ for secret in [True, False]:
+ for key in self._gpg.list_keys(secret=secret):
+ self._gpg.delete_keys(
+ key['fingerprint'],
+ secret=secret)
+ leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!')
+ except:
+ raise
+ finally:
+ leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'),
+ "watch out! Tried to remove default gnupg home!")
+ shutil.rmtree(self._gpg.homedir)
diff --git a/keymanager/tests/ b/keymanager/tests/
new file mode 100644
index 00000000..8eb5d4e3
--- /dev/null
+++ b/keymanager/tests/
@@ -0,0 +1,326 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Base classes for the Key Manager tests.
+import distutils.spawn
+import os.path
+from twisted.internet.defer import gatherResults
+from twisted.trial import unittest
+from leap.common.testing.basetest import BaseLeapTest
+from leap.soledad.client import Soledad
+from leap.keymanager import KeyManager
+PATH = os.path.dirname(os.path.realpath(__file__))
+ADDRESS_2 = ''
+class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest):
+ def setUp(self):
+ self.gpg_binary_path = self._find_gpg()
+ self._soledad = Soledad(
+ u"",
+ u"123456",
+ secrets_path=self.tempdir + "/secret.gpg",
+ local_db_path=self.tempdir + "/soledad.u1db",
+ server_url='',
+ cert_file=None,
+ auth_token=None,
+ syncable=False
+ )
+ def tearDown(self):
+ km = self._key_manager()
+ # wait for the indexes to be ready for the tear down
+ d = km._openpgp.deferred_init
+ d.addCallback(lambda _: self.delete_all_keys(km))
+ d.addCallback(lambda _: self._soledad.close())
+ return d
+ def delete_all_keys(self, km):
+ def delete_keys(keys):
+ deferreds = []
+ for key in keys:
+ d = km._openpgp.delete_key(key)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ def check_deleted(_, private):
+ d = km.get_all_keys(private=private)
+ d.addCallback(lambda keys: self.assertEqual(keys, []))
+ return d
+ deferreds = []
+ for private in [True, False]:
+ d = km.get_all_keys(private=private)
+ d.addCallback(delete_keys)
+ d.addCallback(check_deleted, private)
+ deferreds.append(d)
+ return gatherResults(deferreds)
+ def _key_manager(self, user=ADDRESS, url='', token=None,
+ ca_cert_path=None):
+ return KeyManager(user, url, self._soledad, token=token,
+ gpgbinary=self.gpg_binary_path,
+ ca_cert_path=ca_cert_path)
+ def _find_gpg(self):
+ gpg_path = distutils.spawn.find_executable('gpg')
+ if gpg_path is not None:
+ return os.path.realpath(gpg_path)
+ else:
+ return "/usr/bin/gpg"
+ def get_public_binary_key(self):
+ with open(PATH + '/fixtures/public_key.bin', 'r') as binary_public_key:
+ return
+ def get_private_binary_key(self):
+ with open(
+ PATH + '/fixtures/private_key.bin', 'r') as binary_private_key:
+ return
+# key 24D18DDF: public key "Leap Test Key <>"
+KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
+Version: GnuPG v1.4.10 (GNU/Linux)
+Version: GnuPG v1.4.10 (GNU/Linux)
+# key 7FEE575A: public key "anotheruser <>"
+PUBLIC_KEY_2 = """
+Version: GnuPG v1.4.10 (GNU/Linux)
+PRIVATE_KEY_2 = """
+Version: GnuPG v1.4.10 (GNU/Linux)
diff --git a/keymanager/tests/fixtures/private_key.bin b/keymanager/tests/fixtures/private_key.bin
new file mode 100644
index 00000000..ab174317
--- /dev/null
+++ b/keymanager/tests/fixtures/private_key.bin
Binary files differ
diff --git a/keymanager/tests/fixtures/public_key.bin b/keymanager/tests/fixtures/public_key.bin
new file mode 100644
index 00000000..ab174317
--- /dev/null
+++ b/keymanager/tests/fixtures/public_key.bin
Binary files differ
diff --git a/keymanager/tests/ b/keymanager/tests/
new file mode 100644
index 00000000..b4ab805c
--- /dev/null
+++ b/keymanager/tests/
@@ -0,0 +1,609 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Tests for the Key Manager.
+from os import path
+import json
+import urllib
+from datetime import datetime
+import tempfile
+import pkg_resources
+from leap.common import ca_bundle
+from mock import Mock, MagicMock, patch
+from twisted.internet import defer
+from twisted.trial import unittest
+from twisted.web._responses import NOT_FOUND
+from leap.keymanager import client
+from leap.keymanager import errors
+from leap.keymanager.keys import (
+ OpenPGPKey,
+ is_address,
+ build_key_from_dict,
+from leap.keymanager.validation import ValidationLevels
+from common import (
+ KeyManagerWithSoledadTestCase,
+REMOTE_KEY_URL = "http://site.domain/key"
+class KeyManagerUtilTestCase(unittest.TestCase):
+ def test_is_address(self):
+ self.assertTrue(
+ is_address(''),
+ 'Incorrect address detection.')
+ self.assertFalse(
+ is_address(''),
+ 'Incorrect address detection.')
+ self.assertFalse(
+ is_address('user@'),
+ 'Incorrect address detection.')
+ self.assertFalse(
+ is_address(''),
+ 'Incorrect address detection.')
+ def test_build_key_from_dict(self):
+ kdict = {
+ 'uids': [ADDRESS],
+ 'fingerprint': KEY_FINGERPRINT,
+ 'key_data': PUBLIC_KEY,
+ 'private': False,
+ 'length': 4096,
+ 'expiry_date': 0,
+ 'refreshed_at': 1311239602,
+ }
+ adict = {
+ 'address': ADDRESS,
+ 'private': False,
+ 'last_audited_at': 0,
+ 'validation': str(ValidationLevels.Weak_Chain),
+ 'encr_used': False,
+ 'sign_used': True,
+ }
+ key = build_key_from_dict(kdict, adict)
+ self.assertEqual(
+ kdict['uids'], key.uids,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['fingerprint'], key.fingerprint,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['key_data'], key.key_data,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['private'], key.private,
+ 'Wrong data in key.')
+ self.assertEqual(
+ kdict['length'], key.length,
+ 'Wrong data in key.')
+ self.assertEqual(
+ None, key.expiry_date,
+ 'Wrong data in key.')
+ self.assertEqual(
+ None, key.last_audited_at,
+ 'Wrong data in key.')
+ self.assertEqual(
+ datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at,
+ 'Wrong data in key.')
+ self.assertEqual(
+ adict['address'], key.address,
+ 'Wrong data in key.')
+ self.assertEqual(
+ ValidationLevels.get(adict['validation']), key.validation,
+ 'Wrong data in key.')
+ self.assertEqual(
+ adict['encr_used'], key.encr_used,
+ 'Wrong data in key.')
+ self.assertEqual(
+ adict['sign_used'], key.sign_used,
+ 'Wrong data in key.')
+class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase):
+ @defer.inlineCallbacks
+ def _test_gen_key(self):
+ km = self._key_manager()
+ key = yield km.gen_key()
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertEqual(
+ '', key.address, 'Wrong address bound to key.')
+ self.assertEqual(
+ 4096, key.length, 'Wrong key length.')
+ @defer.inlineCallbacks
+ def test_get_all_keys_in_db(self):
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ # get public keys
+ keys = yield km.get_all_keys(False)
+ self.assertEqual(len(keys), 1, 'Wrong number of keys')
+ self.assertTrue(ADDRESS in keys[0].uids)
+ self.assertFalse(keys[0].private)
+ # get private keys
+ keys = yield km.get_all_keys(True)
+ self.assertEqual(len(keys), 1, 'Wrong number of keys')
+ self.assertTrue(ADDRESS in keys[0].uids)
+ self.assertTrue(keys[0].private)
+ @defer.inlineCallbacks
+ def test_get_public_key(self):
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ # get the key
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
+ self.assertTrue(key is not None)
+ self.assertTrue(ADDRESS in key.uids)
+ self.assertEqual(
+ key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ self.assertFalse(key.private)
+ @defer.inlineCallbacks
+ def test_get_public_key_with_binary_private_key(self):
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(self.get_private_binary_key(), ADDRESS)
+ # get the key
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
+ self.assertTrue(key is not None)
+ self.assertTrue(ADDRESS in key.uids)
+ self.assertEqual(
+ key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ self.assertFalse(key.private)
+ @defer.inlineCallbacks
+ def test_get_private_key(self):
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ # get the key
+ key = yield km.get_key(ADDRESS, private=True, fetch_remote=False)
+ self.assertTrue(key is not None)
+ self.assertTrue(ADDRESS in key.uids)
+ self.assertEqual(
+ key.fingerprint.lower(), KEY_FINGERPRINT.lower())
+ self.assertTrue(key.private)
+ def test_send_key_raises_key_not_found(self):
+ km = self._key_manager()
+ d = km.send_key()
+ return self.assertFailure(d, errors.KeyNotFound)
+ @defer.inlineCallbacks
+ def test_send_key(self):
+ """
+ Test that request is well formed when sending keys to server.
+ """
+ token = "mytoken"
+ km = self._key_manager(token=token)
+ yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ km._async_client_pinned.request = Mock(return_value=defer.succeed(''))
+ # the following data will be used on the send
+ km.ca_cert_path = 'capath'
+ km.session_id = 'sessionid'
+ km.uid = 'myuid'
+ km.api_uri = 'apiuri'
+ km.api_version = 'apiver'
+ yield km.send_key()
+ # setup expected args
+ pubkey = yield km.get_key(km._address)
+ data = urllib.urlencode({
+ km.PUBKEY_KEY: pubkey.key_data,
+ })
+ headers = {'Authorization': [str('Token token=%s' % token)]}
+ headers['Content-Type'] = ['application/x-www-form-urlencoded']
+ url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid')
+ km._async_client_pinned.request.assert_called_once_with(
+ str(url), 'PUT', body=str(data),
+ headers=headers
+ )
+ def test_fetch_keys_from_server(self):
+ """
+ Test that the request is well formed when fetching keys from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2
+ def verify_the_call(_):
+ used_kwargs = km._async_client_pinned.request.call_args[1]
+ km._async_client_pinned.request.assert_called_once_with(
+ expected_url, 'GET', **used_kwargs)
+ d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2)
+ d.addCallback(verify_the_call)
+ return d
+ def test_key_not_found_is_raised_if_key_search_responds_404(self):
+ """
+ Test if key search request comes back with a 404 response then
+ KeyNotFound is raised, with corresponding error message.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ client.readBody = Mock(return_value=defer.succeed(None))
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
+ d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS)
+ def check_key_not_found_is_raised_if_404(_):
+ used_kwargs = km._async_client_pinned.request.call_args[1]
+ check_404_callback = used_kwargs['callback']
+ fake_response = Mock()
+ fake_response.code = NOT_FOUND
+ with self.assertRaisesRegexp(
+ errors.KeyNotFound,
+ '404: %s key not found.' % INVALID_MAIL_ADDRESS):
+ check_404_callback(fake_response)
+ d.addCallback(check_key_not_found_is_raised_if_404)
+ return d
+ def test_non_existing_key_from_nicknym_is_relayed(self):
+ """
+ Test if key search requests throws KeyNotFound, the same error is
+ raised.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ key_not_found_exception = errors.KeyNotFound('some message')
+ km._async_client_pinned.request = Mock(
+ side_effect=key_not_found_exception)
+ def assert_key_not_found_raised(error):
+ self.assertEqual(error.value, key_not_found_exception)
+ d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS)
+ d.addErrback(assert_key_not_found_raised)
+ @defer.inlineCallbacks
+ def test_get_key_fetches_from_server(self):
+ """
+ Test that getting a key successfuly fetches from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.uids)
+ self.assertEqual(key.validation, ValidationLevels.Provider_Trust)
+ @defer.inlineCallbacks
+ def test_get_key_fetches_other_domain(self):
+ """
+ Test that getting a key successfuly fetches from server.
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS_OTHER in key.uids)
+ self.assertEqual(key.validation, ValidationLevels.Weak_Chain)
+ def _fetch_key(self, km, address, key):
+ """
+ :returns: a Deferred that will fire with the OpenPGPKey
+ """
+ data = json.dumps({'address': address, 'openpgp': key})
+ client.readBody = Mock(return_value=defer.succeed(data))
+ # mock the fetcher so it returns the key for ADDRESS_2
+ km._async_client_pinned.request = Mock(
+ return_value=defer.succeed(None))
+ km.ca_cert_path = 'cacertpath'
+ # try to key get without fetching from server
+ d_fail = km.get_key(address, fetch_remote=False)
+ d = self.assertFailure(d_fail, errors.KeyNotFound)
+ # try to get key fetching from server.
+ d.addCallback(lambda _: km.get_key(address))
+ return d
+ @defer.inlineCallbacks
+ def test_put_key_ascii(self):
+ """
+ Test that putting ascii key works
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.uids)
+ @defer.inlineCallbacks
+ def test_put_key_binary(self):
+ """
+ Test that putting binary key works
+ """
+ km = self._key_manager(url=NICKSERVER_URI)
+ yield km.put_raw_key(self.get_public_binary_key(), ADDRESS)
+ key = yield km.get_key(ADDRESS)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(ADDRESS in key.uids)
+ @defer.inlineCallbacks
+ def test_fetch_uri_ascii_key(self):
+ """
+ Test that fetch key downloads the ascii key and gets included in
+ the local storage
+ """
+ km = self._key_manager()
+ km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
+ yield km.fetch_key(ADDRESS, "http://site.domain/key")
+ key = yield km.get_key(ADDRESS)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+ @defer.inlineCallbacks
+ def test_fetch_uri_binary_key(self):
+ """
+ Test that fetch key downloads the binary key and gets included in
+ the local storage
+ """
+ km = self._key_manager()
+ km._async_client.request = Mock(
+ return_value=defer.succeed(self.get_public_binary_key()))
+ yield km.fetch_key(ADDRESS, "http://site.domain/key")
+ key = yield km.get_key(ADDRESS)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+ def test_fetch_uri_empty_key(self):
+ """
+ Test that fetch key raises KeyNotFound if no key in the url
+ """
+ km = self._key_manager()
+ km._async_client.request = Mock(return_value=defer.succeed(""))
+ d = km.fetch_key(ADDRESS, "http://site.domain/key")
+ return self.assertFailure(d, errors.KeyNotFound)
+ def test_fetch_uri_address_differ(self):
+ """
+ Test that fetch key raises KeyAttributesDiffer if the address
+ don't match
+ """
+ km = self._key_manager()
+ km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY))
+ d = km.fetch_key(ADDRESS_2, "http://site.domain/key")
+ return self.assertFailure(d, errors.KeyAddressMismatch)
+ def _mock_get_response(self, km, body):
+ km._async_client.request = MagicMock(return_value=defer.succeed(body))
+ return km._async_client.request
+ @defer.inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_none_specified(self):
+ ca_cert_path = None
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
+ @defer.inlineCallbacks
+ def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self):
+ ca_cert_path = ''
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
+ @defer.inlineCallbacks
+ def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self):
+ ca_cert_path = ca_bundle.where()
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
+ @defer.inlineCallbacks
+ def test_fetch_uses_combined_ca_bundle_otherwise(self):
+ with tempfile.NamedTemporaryFile() as tmp_input, \
+ tempfile.NamedTemporaryFile(delete=False) as tmp_output:
+ ca_content = pkg_resources.resource_string('leap.common.testing',
+ 'cacert.pem')
+ ca_cert_path =
+ self._dump_to_file(ca_cert_path, ca_content)
+ with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock:
+ mock.return_value = tmp_output
+ km = self._key_manager(ca_cert_path=ca_cert_path)
+ get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER)
+ yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL)
+ # assert that combined bundle file is passed to get call
+ get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET')
+ # assert that files got appended
+ expected = self._slurp_file(ca_bundle.where()) + ca_content
+ self.assertEqual(expected, self._slurp_file(
+ del km # force km out of scope
+ self.assertFalse(path.exists(
+ def _dump_to_file(self, filename, content):
+ with open(filename, 'w') as out:
+ out.write(content)
+ def _slurp_file(self, filename):
+ with open(filename) as f:
+ content =
+ return content
+ @defer.inlineCallbacks
+ def test_decrypt_updates_sign_used_for_signer(self):
+ # given
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
+ # when
+ key = yield km.get_key(ADDRESS_2, fetch_remote=False)
+ # then
+ self.assertEqual(True, key.sign_used)
+ @defer.inlineCallbacks
+ def test_decrypt_does_not_update_sign_used_for_recipient(self):
+ # given
+ km = self._key_manager()
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
+ yield km.decrypt(
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
+ # when
+ key = yield km.get_key(
+ ADDRESS, private=False, fetch_remote=False)
+ # then
+ self.assertEqual(False, key.sign_used)
+class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase):
+ RAW_DATA = 'data'
+ @defer.inlineCallbacks
+ def test_keymanager_openpgp_encrypt_decrypt(self):
+ km = self._key_manager()
+ # put raw private key
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ # encrypt
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
+ self.assertNotEqual(self.RAW_DATA, encdata)
+ # decrypt
+ rawdata, signingkey = yield km.decrypt(
+ encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False)
+ self.assertEqual(self.RAW_DATA, rawdata)
+ key = yield km.get_key(ADDRESS_2, private=False, fetch_remote=False)
+ self.assertEqual(signingkey.fingerprint, key.fingerprint)
+ @defer.inlineCallbacks
+ def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self):
+ km = self._key_manager()
+ # put raw keys
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ # encrypt
+ encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2,
+ fetch_remote=False)
+ self.assertNotEqual(self.RAW_DATA, encdata)
+ # verify
+ rawdata, signingkey = yield km.decrypt(
+ encdata, ADDRESS, verify=ADDRESS, fetch_remote=False)
+ self.assertEqual(self.RAW_DATA, rawdata)
+ self.assertTrue(isinstance(signingkey, errors.InvalidSignature))
+ @defer.inlineCallbacks
+ def test_keymanager_openpgp_sign_verify(self):
+ km = self._key_manager()
+ # put raw private keys
+ yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ signdata = yield km.sign(self.RAW_DATA, ADDRESS, detach=False)
+ self.assertNotEqual(self.RAW_DATA, signdata)
+ # verify
+ signingkey = yield km.verify(signdata, ADDRESS, fetch_remote=False)
+ key = yield km.get_key(ADDRESS, private=False, fetch_remote=False)
+ self.assertEqual(signingkey.fingerprint, key.fingerprint)
+ def test_keymanager_encrypt_key_not_found(self):
+ km = self._key_manager()
+ d = km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ d.addCallback(
+ lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, sign=ADDRESS,
+ fetch_remote=False))
+ return self.assertFailure(d, errors.KeyNotFound)
+if __name__ == "__main__":
+ import unittest
+ unittest.main()
+# key 0F91B402:
+# 9420 EC7B 6DCB 867F 5592 E6D1 7504 C974 0F91 B402
+Version: GnuPG v1
diff --git a/keymanager/tests/ b/keymanager/tests/
new file mode 100644
index 00000000..64cd8e10
--- /dev/null
+++ b/keymanager/tests/
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2015 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Tests for the migrator.
+from collections import namedtuple
+from mock import Mock
+from twisted.internet.defer import succeed, inlineCallbacks
+from leap.keymanager.migrator import KeyDocumentsMigrator, KEY_ID_KEY
+from leap.keymanager.documents import (
+from leap.keymanager.validation import ValidationLevels
+from common import (
+ KeyManagerWithSoledadTestCase,
+class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
+ @inlineCallbacks
+ def test_simple_migration(self):
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+ put_doc = self._soledad.put_doc
+ def my_get_from_index(*args):
+ docs = []
+ if (args[0] == TAGS_PRIVATE_INDEX and
+ args[2] == '0'):
+ SoledadDocument = namedtuple("SoledadDocument", ["content"])
+ if args[1] == KEYMANAGER_KEY_TAG:
+ docs = [SoledadDocument({
+ KEY_VALIDATION_KEY: str(ValidationLevels.Weak_Chain),
+ })]
+ if args[1] == KEYMANAGER_ACTIVE_TAG:
+ docs = [SoledadDocument({
+ })]
+ return succeed(docs)
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+ self._soledad.put_doc = Mock(return_value=succeed(None))
+ try:
+ migrator = KeyDocumentsMigrator(self._soledad)
+ yield migrator.migrate()
+ call_list = self._soledad.put_doc.call_args_list
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+ self._soledad.put_doc = put_doc
+ self.assertEqual(len(call_list), 2)
+ active = call_list[0][0][0]
+ key = call_list[1][0][0]
+ self.assertTrue(KEY_ID_KEY not in active.content)
+ self.assertEqual(active.content[KEY_VERSION_KEY],
+ self.assertEqual(active.content[KEY_FINGERPRINT_KEY], KEY_FINGERPRINT)
+ self.assertEqual(active.content[KEY_VALIDATION_KEY],
+ str(ValidationLevels.Weak_Chain))
+ self.assertEqual(active.content[KEY_LAST_AUDITED_AT_KEY], 0)
+ self.assertEqual(active.content[KEY_ENCR_USED_KEY], True)
+ self.assertEqual(active.content[KEY_SIGN_USED_KEY], False)
+ self.assertTrue(KEY_ID_KEY not in key.content)
+ self.assertTrue(KEY_ADDRESS_KEY not in key.content)
+ self.assertTrue(KEY_VALIDATION_KEY not in key.content)
+ self.assertTrue(KEY_LAST_AUDITED_AT_KEY not in key.content)
+ self.assertTrue(KEY_ENCR_USED_KEY not in key.content)
+ self.assertTrue(KEY_SIGN_USED_KEY not in key.content)
+ self.assertEqual(key.content[KEY_UIDS_KEY], [ADDRESS])
+ @inlineCallbacks
+ def test_two_active_docs(self):
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+ put_doc = self._soledad.put_doc
+ def my_get_from_index(*args):
+ docs = []
+ if (args[0] == TAGS_PRIVATE_INDEX and
+ args[2] == '0'):
+ SoledadDocument = namedtuple("SoledadDocument", ["content"])
+ if args[1] == KEYMANAGER_KEY_TAG:
+ validation = str(ValidationLevels.Provider_Trust)
+ docs = [SoledadDocument({
+ KEY_VALIDATION_KEY: validation,
+ })]
+ if args[1] == KEYMANAGER_ACTIVE_TAG:
+ docs = [
+ SoledadDocument({
+ }),
+ SoledadDocument({
+ }),
+ ]
+ return succeed(docs)
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+ self._soledad.put_doc = Mock(return_value=succeed(None))
+ try:
+ migrator = KeyDocumentsMigrator(self._soledad)
+ yield migrator.migrate()
+ call_list = self._soledad.put_doc.call_args_list
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+ self._soledad.put_doc = put_doc
+ self.assertEqual(len(call_list), 3)
+ for active in [call[0][0] for call in call_list][:2]:
+ self.assertTrue(KEY_ID_KEY not in active.content)
+ self.assertEqual(active.content[KEY_VERSION_KEY],
+ self.assertEqual(active.content[KEY_FINGERPRINT_KEY],
+ self.assertEqual(active.content[KEY_VALIDATION_KEY],
+ str(ValidationLevels.Weak_Chain))
+ self.assertEqual(active.content[KEY_LAST_AUDITED_AT_KEY], 0)
+ self.assertEqual(active.content[KEY_ENCR_USED_KEY], False)
+ self.assertEqual(active.content[KEY_SIGN_USED_KEY], False)
diff --git a/keymanager/tests/ b/keymanager/tests/
new file mode 100644
index 00000000..1f78ad41
--- /dev/null
+++ b/keymanager/tests/
@@ -0,0 +1,365 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2014 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Tests for the OpenPGP support on Key Manager.
+from datetime import datetime
+from mock import Mock
+from twisted.internet.defer import inlineCallbacks, gatherResults, succeed
+from leap.keymanager import (
+ KeyNotFound,
+ openpgp,
+from leap.keymanager.documents import (
+from leap.keymanager.keys import OpenPGPKey
+from common import (
+ KeyManagerWithSoledadTestCase,
+class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase):
+ # set the trial timeout to 20min, needed by the key generation test
+ timeout = 1200
+ @inlineCallbacks
+ def _test_openpgp_gen_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, '')
+ key = yield pgp.gen_key('')
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertEqual(
+ '', key.address, 'Wrong address bound to key.')
+ self.assertEqual(
+ 4096, key.length, 'Wrong key length.')
+ @inlineCallbacks
+ def test_openpgp_put_delete_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ @inlineCallbacks
+ def test_openpgp_put_ascii_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self.assertIsInstance(key, OpenPGPKey)
+ self.assertTrue(
+ ADDRESS in key.address, 'Wrong address bound to key.')
+ self.assertEqual(
+ 4096, key.length, 'Wrong key length.')
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ @inlineCallbacks
+ def test_get_public_key(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self.assertTrue(ADDRESS in key.address)
+ self.assertFalse(key.private)
+ self.assertEqual(KEY_FINGERPRINT, key.fingerprint)
+ yield pgp.delete_key(key)
+ yield self._assert_key_not_found(pgp, ADDRESS)
+ @inlineCallbacks
+ def test_openpgp_encrypt_decrypt(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ # encrypt
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ cyphertext = yield pgp.encrypt(data, pubkey)
+ self.assertTrue(cyphertext is not None)
+ self.assertTrue(cyphertext != '')
+ self.assertTrue(cyphertext != data)
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+ self.assertTrue(pgp.is_encrypted(cyphertext))
+ # decrypt
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ decrypted, _ = yield pgp.decrypt(cyphertext, privkey)
+ self.assertEqual(decrypted, data)
+ yield pgp.delete_key(pubkey)
+ yield pgp.delete_key(privkey)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=False)
+ yield self._assert_key_not_found(pgp, ADDRESS, private=True)
+ @inlineCallbacks
+ def test_verify_with_private_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey)
+ self.assertRaises(
+ AssertionError,
+ pgp.verify, signed, privkey)
+ @inlineCallbacks
+ def test_sign_with_public_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ self.assertRaises(
+ AssertionError,
+ pgp.sign, data, ADDRESS, OpenPGPKey)
+ @inlineCallbacks
+ def test_verify_with_wrong_key_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey)
+ yield pgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
+ wrongkey = yield pgp.get_key(ADDRESS_2)
+ self.assertFalse(pgp.verify(signed, wrongkey))
+ @inlineCallbacks
+ def test_encrypt_sign_with_public_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ self.failureResultOf(
+ pgp.encrypt(data, privkey, sign=pubkey),
+ AssertionError)
+ @inlineCallbacks
+ def test_decrypt_verify_with_private_raises(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ encrypted_and_signed = yield pgp.encrypt(
+ data, pubkey, sign=privkey)
+ self.failureResultOf(
+ pgp.decrypt(encrypted_and_signed, privkey, verify=privkey),
+ AssertionError)
+ @inlineCallbacks
+ def test_decrypt_verify_with_wrong_key(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ encrypted_and_signed = yield pgp.encrypt(data, pubkey, sign=privkey)
+ yield pgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
+ wrongkey = yield pgp.get_key(ADDRESS_2)
+ decrypted, validsign = yield pgp.decrypt(encrypted_and_signed,
+ privkey,
+ verify=wrongkey)
+ self.assertEqual(decrypted, data)
+ self.assertFalse(validsign)
+ @inlineCallbacks
+ def test_sign_verify(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signed = pgp.sign(data, privkey, detach=False)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ validsign = pgp.verify(signed, pubkey)
+ self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_encrypt_sign_decrypt_verify(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ yield pgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ pubkey2 = yield pgp.get_key(ADDRESS_2, private=False)
+ privkey2 = yield pgp.get_key(ADDRESS_2, private=True)
+ data = 'data'
+ encrypted_and_signed = yield pgp.encrypt(
+ data, pubkey2, sign=privkey)
+ res, validsign = yield pgp.decrypt(
+ encrypted_and_signed, privkey2, verify=pubkey)
+ self.assertEqual(data, res)
+ self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_sign_verify_detached_sig(self):
+ data = 'data'
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS)
+ privkey = yield pgp.get_key(ADDRESS, private=True)
+ signature = yield pgp.sign(data, privkey, detach=True)
+ pubkey = yield pgp.get_key(ADDRESS, private=False)
+ validsign = pgp.verify(data, pubkey, detached_sig=signature)
+ self.assertTrue(validsign)
+ @inlineCallbacks
+ def test_self_repair_three_keys(self):
+ refreshed_keep = datetime(2007, 1, 1)
+ self._insert_key_docs([datetime(2005, 1, 1),
+ refreshed_keep,
+ datetime(2001, 1, 1)])
+ delete_doc = self._mock_delete_doc()
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self.assertEqual(key.refreshed_at, refreshed_keep)
+ self.assertEqual(self.count, 2)
+ self._soledad.delete_doc = delete_doc
+ @inlineCallbacks
+ def test_self_repair_no_keys(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ get_from_index = self._soledad.get_from_index
+ delete_doc = self._soledad.delete_doc
+ def my_get_from_index(*args):
+ args[2] == KEY_FINGERPRINT):
+ return succeed([])
+ return get_from_index(*args)
+ self._soledad.get_from_index = my_get_from_index
+ self._soledad.delete_doc = Mock(return_value=succeed(None))
+ try:
+ yield self.assertFailure(pgp.get_key(ADDRESS, private=False),
+ KeyNotFound)
+ # it should have deleted the index
+ self.assertEqual(self._soledad.delete_doc.call_count, 1)
+ finally:
+ self._soledad.get_from_index = get_from_index
+ self._soledad.delete_doc = delete_doc
+ @inlineCallbacks
+ def test_self_repair_put_keys(self):
+ self._insert_key_docs([datetime(2005, 1, 1),
+ datetime(2007, 1, 1),
+ datetime(2001, 1, 1)])
+ delete_doc = self._mock_delete_doc()
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS)
+ self._soledad.delete_doc = delete_doc
+ self.assertEqual(self.count, 2)
+ @inlineCallbacks
+ def test_self_repair_six_active_docs(self):
+ pgp = openpgp.OpenPGPScheme(
+ self._soledad, gpgbinary=self.gpg_binary_path)
+ k1 = OpenPGPKey(ADDRESS, fingerprint="1",
+ refreshed_at=datetime(2005, 1, 1))
+ k2 = OpenPGPKey(ADDRESS, fingerprint="2",
+ refreshed_at=datetime(2007, 1, 1))
+ k3 = OpenPGPKey(ADDRESS, fingerprint="3",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True, sign_used=True)
+ k4 = OpenPGPKey(ADDRESS, fingerprint="4",
+ refreshed_at=datetime(2007, 1, 1),
+ sign_used=True)
+ k5 = OpenPGPKey(ADDRESS, fingerprint="5",
+ refreshed_at=datetime(2007, 1, 1),
+ encr_used=True)
+ k6 = OpenPGPKey(ADDRESS, fingerprint="6",
+ refreshed_at=datetime(2006, 1, 1),
+ encr_used=True, sign_used=True)
+ keys = (k1, k2, k3, k4, k5, k6)
+ for key in keys:
+ yield self._soledad.create_doc_from_json(key.get_json())
+ yield self._soledad.create_doc_from_json(key.get_active_json())
+ delete_doc = self._mock_delete_doc()
+ key = yield pgp.get_key(ADDRESS, private=False)
+ self._soledad.delete_doc = delete_doc
+ self.assertEqual(self.count, 5)
+ self.assertEqual(key.fingerprint, "3")
+ def _assert_key_not_found(self, pgp, address, private=False):
+ d = pgp.get_key(address, private=private)
+ return self.assertFailure(d, KeyNotFound)
+ @inlineCallbacks
+ def _insert_key_docs(self, refreshed_at):
+ for date in refreshed_at:
+ key = OpenPGPKey(ADDRESS, fingerprint=KEY_FINGERPRINT,
+ refreshed_at=date)
+ yield self._soledad.create_doc_from_json(key.get_json())
+ yield self._soledad.create_doc_from_json(key.get_active_json())
+ def _mock_delete_doc(self):
+ delete_doc = self._soledad.delete_doc
+ self.count = 0
+ def my_delete_doc(*args):
+ self.count += 1
+ return delete_doc(*args)
+ self._soledad.delete_doc = my_delete_doc
+ return delete_doc
diff --git a/keymanager/tests/ b/keymanager/tests/
new file mode 100644
index 00000000..4aa0795d
--- /dev/null
+++ b/keymanager/tests/
@@ -0,0 +1,502 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2014 LEAP
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+Tests for the Validation Levels
+import unittest
+from datetime import datetime
+from twisted.internet.defer import inlineCallbacks
+from leap.keymanager.errors import KeyNotValidUpgrade
+from leap.keymanager.validation import ValidationLevels
+from common import (
+ KeyManagerWithSoledadTestCase,
+class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase):
+ @inlineCallbacks
+ def test_none_old_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
+ @inlineCallbacks
+ def test_cant_upgrade(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS,
+ validation=ValidationLevels.Provider_Trust)
+ d = km.put_raw_key(UNRELATED_KEY, ADDRESS)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ @inlineCallbacks
+ def test_fingerprint_level(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS,
+ validation=ValidationLevels.Fingerprint)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+ @inlineCallbacks
+ def test_expired_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(EXPIRED_KEY, ADDRESS)
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+ @inlineCallbacks
+ def test_expired_fail_lower_level(self):
+ km = self._key_manager()
+ yield km.put_raw_key(
+ validation=ValidationLevels.Third_Party_Endorsement)
+ d = km.put_raw_key(
+ validation=ValidationLevels.Provider_Trust)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ @inlineCallbacks
+ def test_roll_back(self):
+ km = self._key_manager()
+ yield km.put_raw_key(EXPIRED_KEY_UPDATED, ADDRESS)
+ yield km.put_raw_key(EXPIRED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE)
+ @inlineCallbacks
+ def test_not_used(self):
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS,
+ validation=ValidationLevels.Provider_Trust)
+ yield km.put_raw_key(UNRELATED_KEY, ADDRESS,
+ validation=ValidationLevels.Provider_Endorsement)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT)
+ @inlineCallbacks
+ def test_used_with_verify(self):
+ TEXT = "some text"
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS)
+ signature = yield km.sign(TEXT, ADDRESS)
+ yield self.delete_all_keys(km)
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS)
+ yield km.verify(TEXT, ADDRESS, detached_sig=signature)
+ d = km.put_raw_key(
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ @inlineCallbacks
+ def test_used_with_decrypt(self):
+ TEXT = "some text"
+ km = self._key_manager()
+ yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS)
+ yield km.put_raw_key(PUBLIC_KEY_2, ADDRESS_2)
+ encrypted = yield km.encrypt(TEXT, ADDRESS_2, sign=ADDRESS)
+ yield self.delete_all_keys(km)
+ yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS)
+ yield km.put_raw_key(PRIVATE_KEY_2, ADDRESS_2)
+ yield km.encrypt(TEXT, ADDRESS)
+ yield km.decrypt(encrypted, ADDRESS_2, verify=ADDRESS)
+ d = km.put_raw_key(
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ @inlineCallbacks
+ def test_signed_key(self):
+ km = self._key_manager()
+ yield km.put_raw_key(PUBLIC_KEY, ADDRESS)
+ yield km.put_raw_key(SIGNED_KEY, ADDRESS)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT)
+ @inlineCallbacks
+ def test_two_uuids(self):
+ TEXT = "some text"
+ km = self._key_manager()
+ yield km.put_raw_key(UUIDS_PRIVATE, ADDRESS_2)
+ signature = yield km.sign(TEXT, ADDRESS_2)
+ yield self.delete_all_keys(km)
+ yield km.put_raw_key(UUIDS_KEY, ADDRESS_2)
+ yield km.put_raw_key(UUIDS_KEY, ADDRESS)
+ yield km.encrypt(TEXT, ADDRESS_2)
+ yield km.verify(TEXT, ADDRESS_2, detached_sig=signature)
+ d = km.put_raw_key(
+ validation=ValidationLevels.Provider_Endorsement)
+ yield self.assertFailure(d, KeyNotValidUpgrade)
+ key = yield km.get_key(ADDRESS_2, fetch_remote=False)
+ self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT)
+ yield km.put_raw_key(
+ validation=ValidationLevels.Provider_Endorsement)
+ key = yield km.get_key(ADDRESS, fetch_remote=False)
+ self.assertEqual(key.fingerprint, KEY_FINGERPRINT)
+# Key material for testing
+# key 901FBCA5: public key "Leap Test Key <>"
+Version: GnuPG v1
+# key A1885A7C: public key "Leap Test Key <>"
+Version: GnuPG v1.4.12 (GNU/Linux)
+# updated expiration date
+EXPIRED_KEY_NEW_EXPIRY_DATE = datetime.fromtimestamp(2049717872)
+Version: GnuPG v1.4.12 (GNU/Linux)
+Version: GnuPG v1.4.12 (GNU/Linux)
+# key CA1AD31E: public key "Leap Test Key <>"
+# signed by E36E738D69173C13D709E44F2F455E2824D18DDF
+Version: GnuPG v1.4.12 (GNU/Linux)
+# key 0x1DDBAEB928D982F7: public key two uuids
+# uid anotheruser <>
+# uid Leap Test Key <>
+UUIDS_KEY = """
+Version: GnuPG v1
+Version: GnuPG v1
+if __name__ == "__main__":
+ unittest.main()
diff --git a/keymanager/tox.ini b/keymanager/tox.ini
new file mode 100644
index 00000000..7667788c
--- /dev/null
+++ b/keymanager/tox.ini
@@ -0,0 +1,14 @@
+envlist = py27
+commands = py.test {posargs}
+deps =
+ pytest
+ pdbpp
+ mock
+ setuptools-trial
+ pep8
+ leap.soledad.client
+setenv =
+ HOME=/tmp
diff --git a/keymanager/ b/keymanager/
new file mode 100644
index 00000000..7ed2a21d
--- /dev/null
+++ b/keymanager/
@@ -0,0 +1,1774 @@
+# Version: 0.16
+"""The Versioneer - like a rocketeer, but for versions.
+The Versioneer
+* like a rocketeer, but for versions!
+* Brian Warner
+* License: Public Domain
+* Compatible With: python2.6, 2.7, 3.3, 3.4, 3.5, and pypy
+* [![Latest Version]
+* [![Build Status]
+This is a tool for managing a recorded version number in distutils-based
+python projects. The goal is to remove the tedious and error-prone "update
+the embedded version string" step from your release process. Making a new
+release should be as easy as recording a new tag in your version-control
+system, and maybe making new tarballs.
+## Quick Install
+* `pip install versioneer` to somewhere to your $PATH
+* add a `[versioneer]` section to your setup.cfg (see below)
+* run `versioneer install` in your source tree, commit the results
+## Version Identifiers
+Source trees come from a variety of places:
+* a version-control system checkout (mostly used by developers)
+* a nightly tarball, produced by build automation
+* a snapshot tarball, produced by a web-based VCS browser, like github's
+ "tarball from tag" feature
+* a release tarball, produced by " sdist", distributed through PyPI
+Within each source tree, the version identifier (either a string or a number,
+this tool is format-agnostic) can come from a variety of places:
+* ask the VCS tool itself, e.g. "git describe" (for checkouts), which knows
+ about recent "tags" and an absolute revision-id
+* the name of the directory into which the tarball was unpacked
+* an expanded VCS keyword ($Id$, etc)
+* a `` created by some earlier build step
+For released software, the version identifier is closely related to a VCS
+tag. Some projects use tag names that include more than just the version
+string (e.g. "myproject-1.2" instead of just "1.2"), in which case the tool
+needs to strip the tag prefix to extract the version identifier. For
+unreleased software (between tags), the version identifier should provide
+enough information to help developers recreate the same tree, while also
+giving them an idea of roughly how old the tree is (after version 1.2, before
+version 1.3). Many VCS systems can report a description that captures this,
+for example `git describe --tags --dirty --always` reports things like
+"0.7-1-g574ab98-dirty" to indicate that the checkout is one revision past the
+0.7 tag, has a unique revision id of "574ab98", and is "dirty" (it has
+uncommitted changes.
+The version identifier is used for multiple purposes:
+* to allow the module to self-identify its version: `myproject.__version__`
+* to choose a name and prefix for a ' sdist' tarball
+## Theory of Operation
+Versioneer works by adding a special `` file into your source
+tree, where your `` can import it. This `` knows how to
+dynamically ask the VCS tool for version information at import time.
+`` also contains `$Revision$` markers, and the installation
+process marks `` to have this marker rewritten with a tag name
+during the `git archive` command. As a result, generated tarballs will
+contain enough information to get the proper version.
+To allow `` to compute a version too, a `` is added to
+the top level of your source tree, next to `` and the `setup.cfg`
+that configures it. This overrides several distutils/setuptools commands to
+compute the version when invoked, and changes ` build` and `
+sdist` to replace `` with a small static file that contains just
+the generated version data.
+## Installation
+First, decide on values for the following configuration variables:
+* `VCS`: the version control system you use. Currently accepts "git".
+* `style`: the style of version string to be produced. See "Styles" below for
+ details. Defaults to "pep440", which looks like
+* `versionfile_source`:
+ A project-relative pathname into which the generated version strings should
+ be written. This is usually a `` next to your project's main
+ `` file, so it can be imported at runtime. If your project uses
+ `src/myproject/`, this should be `src/myproject/`.
+ This file should be checked in to your VCS as usual: the copy created below
+ by ` setup_versioneer` will include code that parses expanded VCS
+ keywords in generated tarballs. The 'build' and 'sdist' commands will
+ replace it with a copy that has just the calculated version string.
+ This must be set even if your project does not have any modules (and will
+ therefore never import ``), since " sdist" -based trees
+ still need somewhere to record the pre-calculated version strings. Anywhere
+ in the source tree should do. If there is a `` next to your
+ ``, the ` setup_versioneer` command (described below)
+ will append some `__version__`-setting assignments, if they aren't already
+ present.
+* `versionfile_build`:
+ Like `versionfile_source`, but relative to the build directory instead of
+ the source directory. These will differ when your uses
+ 'package_dir='. If you have `package_dir={'myproject': 'src/myproject'}`,
+ then you will probably have `versionfile_build='myproject/'` and
+ `versionfile_source='src/myproject/'`.
+ If this is set to None, then ` build` will not attempt to rewrite
+ any `` in the built tree. If your project does not have any
+ libraries (e.g. if it only builds a script), then you should use
+ `versionfile_build = None`. To actually use the computed version string,
+ your `` will need to override `distutils.command.build_scripts`
+ with a subclass that explicitly inserts a copy of
+ `versioneer.get_version()` into your script file. See
+ `test/demoapp-script-only/` for an example.
+* `tag_prefix`:
+ a string, like 'PROJECTNAME-', which appears at the start of all VCS tags.
+ If your tags look like 'myproject-1.2.0', then you should use
+ tag_prefix='myproject-'. If you use unprefixed tags like '1.2.0', this
+ should be an empty string, using either `tag_prefix=` or `tag_prefix=''`.
+* `parentdir_prefix`:
+ a optional string, frequently the same as tag_prefix, which appears at the
+ start of all unpacked tarball filenames. If your tarball unpacks into
+ 'myproject-1.2.0', this should be 'myproject-'. To disable this feature,
+ just omit the field from your `setup.cfg`.
+This tool provides one script, named `versioneer`. That script has one mode,
+"install", which writes a copy of `` into the current directory
+and runs ` setup` to finish the installation.
+To versioneer-enable your project:
+* 1: Modify your `setup.cfg`, adding a section named `[versioneer]` and
+ populating it with the configuration values you decided earlier (note that
+ the option names are not case-sensitive):
+ ````
+ [versioneer]
+ VCS = git
+ style = pep440
+ versionfile_source = src/myproject/
+ versionfile_build = myproject/
+ tag_prefix =
+ parentdir_prefix = myproject-
+ ````
+* 2: Run `versioneer install`. This will do the following:
+ * copy `` into the top of your source tree
+ * create `` in the right place (`versionfile_source`)
+ * modify your `` (if one exists next to ``) to define
+ `__version__` (by calling a function from ``)
+ * modify your `` to include both `` and the
+ generated `` in sdist tarballs
+ `versioneer install` will complain about any problems it finds with your
+ `` or `setup.cfg`. Run it multiple times until you have fixed all
+ the problems.
+* 3: add a `import versioneer` to your, and add the following
+ arguments to the setup() call:
+ version=versioneer.get_version(),
+ cmdclass=versioneer.get_cmdclass(),
+* 4: commit these changes to your VCS. To make sure you won't forget,
+ `versioneer install` will mark everything it touched for addition using
+ `git add`. Don't forget to add `` and `setup.cfg` too.
+## Post-Installation Usage
+Once established, all uses of your tree from a VCS checkout should get the
+current version string. All generated tarballs should include an embedded
+version string (so users who unpack them will not need a VCS tool installed).
+If you distribute your project through PyPI, then the release process should
+boil down to two steps:
+* 1: git tag 1.0
+* 2: python register sdist upload
+If you distribute it through github (i.e. users use github to generate
+tarballs with `git archive`), the process is:
+* 1: git tag 1.0
+* 2: git push; git push --tags
+Versioneer will report "0+untagged.NUMCOMMITS.gHASH" until your tree has at
+least one tag in its history.
+## Version-String Flavors
+Code which uses Versioneer can learn about its version string at runtime by
+importing `_version` from your main `` file and running the
+`get_versions()` function. From the "outside" (e.g. in ``), you can
+import the top-level `` and run `get_versions()`.
+Both functions return a dictionary with different flavors of version
+* `['version']`: A condensed version string, rendered using the selected
+ style. This is the most commonly used value for the project's version
+ string. The default "pep440" style yields strings like `0.11`,
+ `0.11+2.g1076c97`, or `0.11+2.g1076c97.dirty`. See the "Styles" section
+ below for alternative styles.
+* `['full-revisionid']`: detailed revision identifier. For Git, this is the
+ full SHA1 commit id, e.g. "1076c978a8d3cfc70f408fe5974aa6c092c949ac".
+* `['dirty']`: a boolean, True if the tree has uncommitted changes. Note that
+ this is only accurate if run in a VCS checkout, otherwise it is likely to
+ be False or None
+* `['error']`: if the version string could not be computed, this will be set
+ to a string describing the problem, otherwise it will be None. It may be
+ useful to throw an exception in if this is set, to avoid e.g.
+ creating tarballs with a version string of "unknown".
+Some variants are more useful than others. Including `full-revisionid` in a
+bug report should allow developers to reconstruct the exact code being tested
+(or indicate the presence of local changes that should be shared with the
+developers). `version` is suitable for display in an "about" box or a CLI
+`--version` output: it can be easily compared against release notes and lists
+of bugs fixed in various releases.
+The installer adds the following text to your `` to place a basic
+version in `YOURPROJECT.__version__`:
+ from ._version import get_versions
+ __version__ = get_versions()['version']
+ del get_versions
+## Styles
+The setup.cfg `style=` configuration controls how the VCS information is
+rendered into a version string.
+The default style, "pep440", produces a PEP440-compliant string, equal to the
+un-prefixed tag name for actual releases, and containing an additional "local
+version" section with more detail for in-between builds. For Git, this is
+TAG[+DISTANCE.gHEX[.dirty]] , using information from `git describe --tags
+--dirty --always`. For example "0.11+2.g1076c97.dirty" indicates that the
+tree is like the "1076c97" commit but has uncommitted changes (".dirty"), and
+that this commit is two revisions ("+2") beyond the "0.11" tag. For released
+software (exactly equal to a known tag), the identifier will only contain the
+stripped tag, e.g. "0.11".
+Other styles are available. See in the Versioneer source tree for
+## Debugging
+Versioneer tries to avoid fatal errors: if something goes wrong, it will tend
+to return a version of "0+unknown". To investigate the problem, run `
+version`, which will run the version-lookup code in a verbose mode, and will
+display the full contents of `get_versions()` (including the `error` string,
+which may help identify what went wrong).
+## Updating Versioneer
+To upgrade your project to a new release of Versioneer, do the following:
+* install the new Versioneer (`pip install -U versioneer` or equivalent)
+* edit `setup.cfg`, if necessary, to include any new configuration settings
+ indicated by the release notes
+* re-run `versioneer install` in your source tree, to replace
+ `SRC/`
+* commit any changed files
+### Upgrading to 0.16
+Nothing special.
+### Upgrading to 0.15
+Starting with this version, Versioneer is configured with a `[versioneer]`
+section in your `setup.cfg` file. Earlier versions required the `` to
+set attributes on the `versioneer` module immediately after import. The new
+version will refuse to run (raising an exception during import) until you
+have provided the necessary `setup.cfg` section.
+In addition, the Versioneer package provides an executable named
+`versioneer`, and the installation process is driven by running `versioneer
+install`. In 0.14 and earlier, the executable was named
+`versioneer-installer` and was run without an argument.
+### Upgrading to 0.14
+0.14 changes the format of the version string. 0.13 and earlier used
+hyphen-separated strings like "0.11-2-g1076c97-dirty". 0.14 and beyond use a
+plus-separated "local version" section strings, with dot-separated
+components, like "0.11+2.g1076c97". PEP440-strict tools did not like the old
+format, but should be ok with the new one.
+### Upgrading from 0.11 to 0.12
+Nothing special.
+### Upgrading from 0.10 to 0.11
+You must add a `versioneer.VCS = "git"` to your `` before re-running
+` setup_versioneer`. This will enable the use of additional
+version-control systems (SVN, etc) in the future.
+## Future Directions
+This tool is designed to make it easily extended to other version-control
+systems: all VCS-specific components are in separate directories like
+src/git/ . The top-level `` script is assembled from these
+components by running . In the future,
+will take a VCS name as an argument, and will construct a version of
+`` that is specific to the given VCS. It might also take the
+configuration arguments that are currently provided manually during
+installation by editing . Alternatively, it might go the other
+direction and include code from all supported VCS systems, reducing the
+number of intermediate scripts.
+## License
+To make Versioneer easier to embed, all its code is dedicated to the public
+domain. The `` that it creates is also in the public domain.
+Specifically, both are released under the Creative Commons "Public Domain
+Dedication" license (CC0-1.0), as described in
+ .
+from __future__ import print_function
+ import configparser
+except ImportError:
+ import ConfigParser as configparser
+import errno
+import json
+import os
+import re
+import subprocess
+import sys
+class VersioneerConfig:
+ """Container for Versioneer configuration parameters."""
+def get_root():
+ """Get the project root directory.
+ We require that all commands are run from the project root, i.e. the
+ directory that contains, setup.cfg, and .
+ """
+ root = os.path.realpath(os.path.abspath(os.getcwd()))
+ setup_py = os.path.join(root, "")
+ versioneer_py = os.path.join(root, "")
+ if not (os.path.exists(setup_py) or os.path.exists(versioneer_py)):
+ # allow 'python path/to/ COMMAND'
+ root = os.path.dirname(os.path.realpath(os.path.abspath(sys.argv[0])))
+ setup_py = os.path.join(root, "")
+ versioneer_py = os.path.join(root, "")
+ if not (os.path.exists(setup_py) or os.path.exists(versioneer_py)):
+ err = ("Versioneer was unable to run the project root directory. "
+ "Versioneer requires to be executed from "
+ "its immediate directory (like 'python COMMAND'), "
+ "or in a way that lets it use sys.argv[0] to find the root "
+ "(like 'python path/to/ COMMAND').")
+ raise VersioneerBadRootError(err)
+ try:
+ # Certain runtime workflows ( install/develop in a setuptools
+ # tree) execute all dependencies in a single python process, so
+ # "versioneer" may be imported multiple times, and python's shared
+ # module-import table will cache the first one. So we can't use
+ # os.path.dirname(__file__), as that will find whichever
+ # was first imported, even in later projects.
+ me = os.path.realpath(os.path.abspath(__file__))
+ if os.path.splitext(me)[0] != os.path.splitext(versioneer_py)[0]:
+ print("Warning: build in %s is using from %s"
+ % (os.path.dirname(me), versioneer_py))
+ except NameError:
+ pass
+ return root
+def get_config_from_root(root):
+ """Read the project setup.cfg file to determine Versioneer config."""
+ # This might raise EnvironmentError (if setup.cfg is missing), or
+ # configparser.NoSectionError (if it lacks a [versioneer] section), or
+ # configparser.NoOptionError (if it lacks "VCS="). See the docstring at
+ # the top of for instructions on writing your setup.cfg .
+ setup_cfg = os.path.join(root, "setup.cfg")
+ parser = configparser.SafeConfigParser()
+ with open(setup_cfg, "r") as f:
+ parser.readfp(f)
+ VCS = parser.get("versioneer", "VCS") # mandatory
+ def get(parser, name):
+ if parser.has_option("versioneer", name):
+ return parser.get("versioneer", name)
+ return None
+ cfg = VersioneerConfig()
+ cfg.VCS = VCS
+ = get(parser, "style") or ""
+ cfg.versionfile_source = get(parser, "versionfile_source")
+ cfg.versionfile_build = get(parser, "versionfile_build")
+ cfg.tag_prefix = get(parser, "tag_prefix")
+ if cfg.tag_prefix in ("''", '""'):
+ cfg.tag_prefix = ""
+ cfg.parentdir_prefix = get(parser, "parentdir_prefix")
+ cfg.verbose = get(parser, "verbose")
+ return cfg
+class NotThisMethod(Exception):
+ """Exception raised if a method is not valid for the current scenario."""
+# these dictionaries contain VCS-specific tools
+def register_vcs_handler(vcs, method): # decorator
+ """Decorator to mark a method as the handler for a particular VCS."""
+ def decorate(f):
+ """Store f in HANDLERS[vcs][method]."""
+ if vcs not in HANDLERS:
+ HANDLERS[vcs] = {}
+ HANDLERS[vcs][method] = f
+ return f
+ return decorate
+def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False):
+ """Call the given command(s)."""
+ assert isinstance(commands, list)
+ p = None
+ for c in commands:
+ try:
+ dispcmd = str([c] + args)
+ # remember shell=False, so use git.cmd on windows, not just git
+ p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE,
+ stderr=(subprocess.PIPE if hide_stderr
+ else None))
+ break
+ except EnvironmentError:
+ e = sys.exc_info()[1]
+ if e.errno == errno.ENOENT:
+ continue
+ if verbose:
+ print("unable to run %s" % dispcmd)
+ print(e)
+ return None
+ else:
+ if verbose:
+ print("unable to find command, tried %s" % (commands,))
+ return None
+ stdout = p.communicate()[0].strip()
+ if sys.version_info[0] >= 3:
+ stdout = stdout.decode()
+ if p.returncode != 0:
+ if verbose:
+ print("unable to run %s (error)" % dispcmd)
+ return None
+ return stdout
+LONG_VERSION_PY['git'] = '''
+# This file helps to compute a version number in source trees obtained from
+# git-archive tarball (such as those provided by githubs download-from-tag
+# feature). Distribution tarballs (built by sdist) and build
+# directories (produced by build) will contain a much shorter file
+# that just contains the computed version number.
+# This file is released into the public domain. Generated by
+# versioneer-0.16 (
+"""Git implementation of"""
+import errno
+import os
+import re
+import subprocess
+import sys
+def get_keywords():
+ """Get the keywords needed to look up the version information."""
+ # these strings will be replaced by git during git-archive.
+ # will grep for the variable names, so they must
+ # each be defined on a line of their own. will just call
+ # get_keywords().
+ git_refnames = "%(DOLLAR)sFormat:%%d%(DOLLAR)s"
+ git_full = "%(DOLLAR)sFormat:%%H%(DOLLAR)s"
+ keywords = {"refnames": git_refnames, "full": git_full}
+ return keywords
+class VersioneerConfig:
+ """Container for Versioneer configuration parameters."""
+def get_config():
+ """Create, populate and return the VersioneerConfig() object."""
+ # these strings are filled in when ' versioneer' creates
+ #
+ cfg = VersioneerConfig()
+ cfg.VCS = "git"
+ = "%(STYLE)s"
+ cfg.tag_prefix = "%(TAG_PREFIX)s"
+ cfg.parentdir_prefix = "%(PARENTDIR_PREFIX)s"
+ cfg.versionfile_source = "%(VERSIONFILE_SOURCE)s"
+ cfg.verbose = False
+ return cfg
+class NotThisMethod(Exception):
+ """Exception raised if a method is not valid for the current scenario."""
+def register_vcs_handler(vcs, method): # decorator
+ """Decorator to mark a method as the handler for a particular VCS."""
+ def decorate(f):
+ """Store f in HANDLERS[vcs][method]."""
+ if vcs not in HANDLERS:
+ HANDLERS[vcs] = {}
+ HANDLERS[vcs][method] = f
+ return f
+ return decorate
+def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False):
+ """Call the given command(s)."""
+ assert isinstance(commands, list)
+ p = None
+ for c in commands:
+ try:
+ dispcmd = str([c] + args)
+ # remember shell=False, so use git.cmd on windows, not just git
+ p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE,
+ stderr=(subprocess.PIPE if hide_stderr
+ else None))
+ break
+ except EnvironmentError:
+ e = sys.exc_info()[1]
+ if e.errno == errno.ENOENT:
+ continue
+ if verbose:
+ print("unable to run %%s" %% dispcmd)
+ print(e)
+ return None
+ else:
+ if verbose:
+ print("unable to find command, tried %%s" %% (commands,))
+ return None
+ stdout = p.communicate()[0].strip()
+ if sys.version_info[0] >= 3:
+ stdout = stdout.decode()
+ if p.returncode != 0:
+ if verbose:
+ print("unable to run %%s (error)" %% dispcmd)
+ return None
+ return stdout
+def versions_from_parentdir(parentdir_prefix, root, verbose):
+ """Try to determine the version from the parent directory name.
+ Source tarballs conventionally unpack into a directory that includes
+ both the project name and a version string.
+ """
+ dirname = os.path.basename(root)
+ if not dirname.startswith(parentdir_prefix):
+ if verbose:
+ print("guessing rootdir is '%%s', but '%%s' doesn't start with "
+ "prefix '%%s'" %% (root, dirname, parentdir_prefix))
+ raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
+ return {"version": dirname[len(parentdir_prefix):],
+ "full-revisionid": None,
+ "dirty": False, "error": None}
+@register_vcs_handler("git", "get_keywords")
+def git_get_keywords(versionfile_abs):
+ """Extract version information from the given file."""
+ # the code embedded in can just fetch the value of these
+ # keywords. When used from, we don't want to import,
+ # so we do it with a regexp instead. This function is not used from
+ #
+ keywords = {}
+ try:
+ f = open(versionfile_abs, "r")
+ for line in f.readlines():
+ if line.strip().startswith("git_refnames ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["refnames"] =
+ if line.strip().startswith("git_full ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["full"] =
+ f.close()
+ except EnvironmentError:
+ pass
+ return keywords
+@register_vcs_handler("git", "keywords")
+def git_versions_from_keywords(keywords, tag_prefix, verbose):
+ """Get version information from git keywords."""
+ if not keywords:
+ raise NotThisMethod("no keywords at all, weird")
+ refnames = keywords["refnames"].strip()
+ if refnames.startswith("$Format"):
+ if verbose:
+ print("keywords are unexpanded, not using")
+ raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
+ refs = set([r.strip() for r in refnames.strip("()").split(",")])
+ # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
+ # just "foo-1.0". If we see a "tag: " prefix, prefer those.
+ TAG = "tag: "
+ tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
+ if not tags:
+ # Either we're using git < 1.8.3, or there really are no tags. We use
+ # a heuristic: assume all version tags have a digit. The old git %%d
+ # expansion behaves like git log --decorate=short and strips out the
+ # refs/heads/ and refs/tags/ prefixes that would let us distinguish
+ # between branches and tags. By ignoring refnames without digits, we
+ # filter out many common branch names like "release" and
+ # "stabilization", as well as "HEAD" and "master".
+ tags = set([r for r in refs if'\d', r)])
+ if verbose:
+ print("discarding '%%s', no digits" %% ",".join(refs-tags))
+ if verbose:
+ print("likely tags: %%s" %% ",".join(sorted(tags)))
+ for ref in sorted(tags):
+ # sorting will prefer e.g. "2.0" over "2.0rc1"
+ if ref.startswith(tag_prefix):
+ r = ref[len(tag_prefix):]
+ if verbose:
+ print("picking %%s" %% r)
+ return {"version": r,
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": None
+ }
+ # no suitable tags, so version is "0+unknown", but full hex is still there
+ if verbose:
+ print("no suitable tags, using unknown + full revision id")
+ return {"version": "0+unknown",
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": "no suitable tags"}
+@register_vcs_handler("git", "pieces_from_vcs")
+def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
+ """Get version from 'git describe' in the root of the source tree.
+ This only gets called if the git-archive 'subst' keywords were *not*
+ expanded, and hasn't already been rewritten with a short
+ version string, meaning we're inside a checked out source tree.
+ """
+ if not os.path.exists(os.path.join(root, ".git")):
+ if verbose:
+ print("no .git in %%s" %% root)
+ raise NotThisMethod("no .git directory")
+ GITS = ["git"]
+ if sys.platform == "win32":
+ GITS = ["git.cmd", "git.exe"]
+ # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
+ # if there isn't one, this yields HEX[-dirty] (no NUM)
+ describe_out = run_command(GITS, ["describe", "--tags", "--dirty",
+ "--always", "--long",
+ "--match", "%%s*" %% tag_prefix],
+ cwd=root)
+ # --long was added in git-1.5.5
+ if describe_out is None:
+ raise NotThisMethod("'git describe' failed")
+ describe_out = describe_out.strip()
+ full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
+ if full_out is None:
+ raise NotThisMethod("'git rev-parse' failed")
+ full_out = full_out.strip()
+ pieces = {}
+ pieces["long"] = full_out
+ pieces["short"] = full_out[:7] # maybe improved later
+ pieces["error"] = None
+ # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
+ # TAG might have hyphens.
+ git_describe = describe_out
+ # look for -dirty suffix
+ dirty = git_describe.endswith("-dirty")
+ pieces["dirty"] = dirty
+ if dirty:
+ git_describe = git_describe[:git_describe.rindex("-dirty")]
+ # now we have TAG-NUM-gHEX or HEX
+ if "-" in git_describe:
+ mo ='^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
+ if not mo:
+ # unparseable. Maybe git-describe is misbehaving?
+ pieces["error"] = ("unable to parse git-describe output: '%%s'"
+ %% describe_out)
+ return pieces
+ # tag
+ full_tag =
+ if not full_tag.startswith(tag_prefix):
+ if verbose:
+ fmt = "tag '%%s' doesn't start with prefix '%%s'"
+ print(fmt %% (full_tag, tag_prefix))
+ pieces["error"] = ("tag '%%s' doesn't start with prefix '%%s'"
+ %% (full_tag, tag_prefix))
+ return pieces
+ pieces["closest-tag"] = full_tag[len(tag_prefix):]
+ # distance: number of commits since tag
+ pieces["distance"] = int(
+ # commit: short hex revision ID
+ pieces["short"] =
+ else:
+ # HEX: no tags
+ pieces["closest-tag"] = None
+ count_out = run_command(GITS, ["rev-list", "HEAD", "--count"],
+ cwd=root)
+ pieces["distance"] = int(count_out) # total number of commits
+ return pieces
+def plus_or_dot(pieces):
+ """Return a + if we don't already have one, else return a ."""
+ if "+" in pieces.get("closest-tag", ""):
+ return "."
+ return "+"
+def render_pep440(pieces):
+ """Build up version string, with post-release "local version identifier".
+ Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
+ get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty
+ Exceptions:
+ 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += plus_or_dot(pieces)
+ rendered += "%%d.g%%s" %% (pieces["distance"], pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ else:
+ # exception #1
+ rendered = "0+untagged.%%d.g%%s" %% (pieces["distance"],
+ pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ return rendered
+def render_pep440_pre(pieces):
+ """TAG[.post.devDISTANCE] -- No -dirty.
+ Exceptions:
+ 1: no tags.
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "" %% pieces["distance"]
+ else:
+ # exception #1
+ rendered = "" %% pieces["distance"]
+ return rendered
+def render_pep440_post(pieces):
+ """TAG[.postDISTANCE[.dev0]+gHEX] .
+ The ".dev0" means dirty. Note that .dev0 sorts backwards
+ (a dirty tree will appear "older" than the corresponding clean one),
+ but you shouldn't be releasing software with -dirty anyways.
+ Exceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%%d" %% pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += plus_or_dot(pieces)
+ rendered += "g%%s" %% pieces["short"]
+ else:
+ # exception #1
+ rendered = "" %% pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += "+g%%s" %% pieces["short"]
+ return rendered
+def render_pep440_old(pieces):
+ """TAG[.postDISTANCE[.dev0]] .
+ The ".dev0" means dirty.
+ Eexceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%%d" %% pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ else:
+ # exception #1
+ rendered = "" %% pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ return rendered
+def render_git_describe(pieces):
+ """TAG[-DISTANCE-gHEX][-dirty].
+ Like 'git describe --tags --dirty --always'.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "-%%d-g%%s" %% (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render_git_describe_long(pieces):
+ """TAG-DISTANCE-gHEX[-dirty].
+ Like 'git describe --tags --dirty --always -long'.
+ The distance/hash is unconditional.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ rendered += "-%%d-g%%s" %% (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render(pieces, style):
+ """Render the given version pieces into the requested style."""
+ if pieces["error"]:
+ return {"version": "unknown",
+ "full-revisionid": pieces.get("long"),
+ "dirty": None,
+ "error": pieces["error"]}
+ if not style or style == "default":
+ style = "pep440" # the default
+ if style == "pep440":
+ rendered = render_pep440(pieces)
+ elif style == "pep440-pre":
+ rendered = render_pep440_pre(pieces)
+ elif style == "pep440-post":
+ rendered = render_pep440_post(pieces)
+ elif style == "pep440-old":
+ rendered = render_pep440_old(pieces)
+ elif style == "git-describe":
+ rendered = render_git_describe(pieces)
+ elif style == "git-describe-long":
+ rendered = render_git_describe_long(pieces)
+ else:
+ raise ValueError("unknown style '%%s'" %% style)
+ return {"version": rendered, "full-revisionid": pieces["long"],
+ "dirty": pieces["dirty"], "error": None}
+def get_versions():
+ """Get version information or return default if unable to do so."""
+ # I am in, which lives at ROOT/VERSIONFILE_SOURCE. If we have
+ # __file__, we can work backwards from there to the root. Some
+ # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which
+ # case we can only use expanded keywords.
+ cfg = get_config()
+ verbose = cfg.verbose
+ try:
+ return git_versions_from_keywords(get_keywords(), cfg.tag_prefix,
+ verbose)
+ except NotThisMethod:
+ pass
+ try:
+ root = os.path.realpath(__file__)
+ # versionfile_source is the relative path from the top of the source
+ # tree (where the .git directory might live) to this file. Invert
+ # this to find the root from __file__.
+ for i in cfg.versionfile_source.split('/'):
+ root = os.path.dirname(root)
+ except NameError:
+ return {"version": "0+unknown", "full-revisionid": None,
+ "dirty": None,
+ "error": "unable to find root of source tree"}
+ try:
+ pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose)
+ return render(pieces,
+ except NotThisMethod:
+ pass
+ try:
+ if cfg.parentdir_prefix:
+ return versions_from_parentdir(cfg.parentdir_prefix, root, verbose)
+ except NotThisMethod:
+ pass
+ return {"version": "0+unknown", "full-revisionid": None,
+ "dirty": None,
+ "error": "unable to compute version"}
+@register_vcs_handler("git", "get_keywords")
+def git_get_keywords(versionfile_abs):
+ """Extract version information from the given file."""
+ # the code embedded in can just fetch the value of these
+ # keywords. When used from, we don't want to import,
+ # so we do it with a regexp instead. This function is not used from
+ #
+ keywords = {}
+ try:
+ f = open(versionfile_abs, "r")
+ for line in f.readlines():
+ if line.strip().startswith("git_refnames ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["refnames"] =
+ if line.strip().startswith("git_full ="):
+ mo ='=\s*"(.*)"', line)
+ if mo:
+ keywords["full"] =
+ f.close()
+ except EnvironmentError:
+ pass
+ return keywords
+@register_vcs_handler("git", "keywords")
+def git_versions_from_keywords(keywords, tag_prefix, verbose):
+ """Get version information from git keywords."""
+ if not keywords:
+ raise NotThisMethod("no keywords at all, weird")
+ refnames = keywords["refnames"].strip()
+ if refnames.startswith("$Format"):
+ if verbose:
+ print("keywords are unexpanded, not using")
+ raise NotThisMethod("unexpanded keywords, not a git-archive tarball")
+ refs = set([r.strip() for r in refnames.strip("()").split(",")])
+ # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of
+ # just "foo-1.0". If we see a "tag: " prefix, prefer those.
+ TAG = "tag: "
+ tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)])
+ if not tags:
+ # Either we're using git < 1.8.3, or there really are no tags. We use
+ # a heuristic: assume all version tags have a digit. The old git %d
+ # expansion behaves like git log --decorate=short and strips out the
+ # refs/heads/ and refs/tags/ prefixes that would let us distinguish
+ # between branches and tags. By ignoring refnames without digits, we
+ # filter out many common branch names like "release" and
+ # "stabilization", as well as "HEAD" and "master".
+ tags = set([r for r in refs if'\d', r)])
+ if verbose:
+ print("discarding '%s', no digits" % ",".join(refs-tags))
+ if verbose:
+ print("likely tags: %s" % ",".join(sorted(tags)))
+ for ref in sorted(tags):
+ # sorting will prefer e.g. "2.0" over "2.0rc1"
+ if ref.startswith(tag_prefix):
+ r = ref[len(tag_prefix):]
+ if verbose:
+ print("picking %s" % r)
+ return {"version": r,
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": None
+ }
+ # no suitable tags, so version is "0+unknown", but full hex is still there
+ if verbose:
+ print("no suitable tags, using unknown + full revision id")
+ return {"version": "0+unknown",
+ "full-revisionid": keywords["full"].strip(),
+ "dirty": False, "error": "no suitable tags"}
+@register_vcs_handler("git", "pieces_from_vcs")
+def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
+ """Get version from 'git describe' in the root of the source tree.
+ This only gets called if the git-archive 'subst' keywords were *not*
+ expanded, and hasn't already been rewritten with a short
+ version string, meaning we're inside a checked out source tree.
+ """
+ if not os.path.exists(os.path.join(root, ".git")):
+ if verbose:
+ print("no .git in %s" % root)
+ raise NotThisMethod("no .git directory")
+ GITS = ["git"]
+ if sys.platform == "win32":
+ GITS = ["git.cmd", "git.exe"]
+ # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty]
+ # if there isn't one, this yields HEX[-dirty] (no NUM)
+ describe_out = run_command(GITS, ["describe", "--tags", "--dirty",
+ "--always", "--long",
+ "--match", "%s*" % tag_prefix],
+ cwd=root)
+ # --long was added in git-1.5.5
+ if describe_out is None:
+ raise NotThisMethod("'git describe' failed")
+ describe_out = describe_out.strip()
+ full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root)
+ if full_out is None:
+ raise NotThisMethod("'git rev-parse' failed")
+ full_out = full_out.strip()
+ pieces = {}
+ pieces["long"] = full_out
+ pieces["short"] = full_out[:7] # maybe improved later
+ pieces["error"] = None
+ # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
+ # TAG might have hyphens.
+ git_describe = describe_out
+ # look for -dirty suffix
+ dirty = git_describe.endswith("-dirty")
+ pieces["dirty"] = dirty
+ if dirty:
+ git_describe = git_describe[:git_describe.rindex("-dirty")]
+ # now we have TAG-NUM-gHEX or HEX
+ if "-" in git_describe:
+ mo ='^(.+)-(\d+)-g([0-9a-f]+)$', git_describe)
+ if not mo:
+ # unparseable. Maybe git-describe is misbehaving?
+ pieces["error"] = ("unable to parse git-describe output: '%s'"
+ % describe_out)
+ return pieces
+ # tag
+ full_tag =
+ if not full_tag.startswith(tag_prefix):
+ if verbose:
+ fmt = "tag '%s' doesn't start with prefix '%s'"
+ print(fmt % (full_tag, tag_prefix))
+ pieces["error"] = ("tag '%s' doesn't start with prefix '%s'"
+ % (full_tag, tag_prefix))
+ return pieces
+ pieces["closest-tag"] = full_tag[len(tag_prefix):]
+ # distance: number of commits since tag
+ pieces["distance"] = int(
+ # commit: short hex revision ID
+ pieces["short"] =
+ else:
+ # HEX: no tags
+ pieces["closest-tag"] = None
+ count_out = run_command(GITS, ["rev-list", "HEAD", "--count"],
+ cwd=root)
+ pieces["distance"] = int(count_out) # total number of commits
+ return pieces
+def do_vcs_install(manifest_in, versionfile_source, ipy):
+ """Git-specific installation logic for Versioneer.
+ For Git, this means creating/changing .gitattributes to mark
+ for export-time keyword substitution.
+ """
+ GITS = ["git"]
+ if sys.platform == "win32":
+ GITS = ["git.cmd", "git.exe"]
+ files = [manifest_in, versionfile_source]
+ if ipy:
+ files.append(ipy)
+ try:
+ me = __file__
+ if me.endswith(".pyc") or me.endswith(".pyo"):
+ me = os.path.splitext(me)[0] + ".py"
+ versioneer_file = os.path.relpath(me)
+ except NameError:
+ versioneer_file = ""
+ files.append(versioneer_file)
+ present = False
+ try:
+ f = open(".gitattributes", "r")
+ for line in f.readlines():
+ if line.strip().startswith(versionfile_source):
+ if "export-subst" in line.strip().split()[1:]:
+ present = True
+ f.close()
+ except EnvironmentError:
+ pass
+ if not present:
+ f = open(".gitattributes", "a+")
+ f.write("%s export-subst\n" % versionfile_source)
+ f.close()
+ files.append(".gitattributes")
+ run_command(GITS, ["add", "--"] + files)
+def versions_from_parentdir(parentdir_prefix, root, verbose):
+ """Try to determine the version from the parent directory name.
+ Source tarballs conventionally unpack into a directory that includes
+ both the project name and a version string.
+ """
+ dirname = os.path.basename(root)
+ if not dirname.startswith(parentdir_prefix):
+ if verbose:
+ print("guessing rootdir is '%s', but '%s' doesn't start with "
+ "prefix '%s'" % (root, dirname, parentdir_prefix))
+ raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
+ return {"version": dirname[len(parentdir_prefix):],
+ "full-revisionid": None,
+ "dirty": False, "error": None}
+# This file was generated by '' (0.16) from
+# revision-control system data, or from the parent directory name of an
+# unpacked source archive. Distribution tarballs contain a pre-generated copy
+# of this file.
+import json
+import sys
+version_json = '''
+def get_versions():
+ return json.loads(version_json)
+def versions_from_file(filename):
+ """Try to determine the version from if present."""
+ try:
+ with open(filename) as f:
+ contents =
+ except EnvironmentError:
+ raise NotThisMethod("unable to read")
+ mo ="version_json = '''\n(.*)''' # END VERSION_JSON",
+ contents, re.M | re.S)
+ if not mo:
+ raise NotThisMethod("no version_json in")
+ return json.loads(
+def write_to_version_file(filename, versions):
+ """Write the given version number to the given file."""
+ os.unlink(filename)
+ contents = json.dumps(versions, sort_keys=True,
+ indent=1, separators=(",", ": "))
+ with open(filename, "w") as f:
+ f.write(SHORT_VERSION_PY % contents)
+ print("set %s to '%s'" % (filename, versions["version"]))
+def plus_or_dot(pieces):
+ """Return a + if we don't already have one, else return a ."""
+ if "+" in pieces.get("closest-tag", ""):
+ return "."
+ return "+"
+def render_pep440(pieces):
+ """Build up version string, with post-release "local version identifier".
+ Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you
+ get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty
+ Exceptions:
+ 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += plus_or_dot(pieces)
+ rendered += "%d.g%s" % (pieces["distance"], pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ else:
+ # exception #1
+ rendered = "0+untagged.%d.g%s" % (pieces["distance"],
+ pieces["short"])
+ if pieces["dirty"]:
+ rendered += ".dirty"
+ return rendered
+def render_pep440_pre(pieces):
+ """TAG[.post.devDISTANCE] -- No -dirty.
+ Exceptions:
+ 1: no tags.
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "" % pieces["distance"]
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ return rendered
+def render_pep440_post(pieces):
+ """TAG[.postDISTANCE[.dev0]+gHEX] .
+ The ".dev0" means dirty. Note that .dev0 sorts backwards
+ (a dirty tree will appear "older" than the corresponding clean one),
+ but you shouldn't be releasing software with -dirty anyways.
+ Exceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%d" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += plus_or_dot(pieces)
+ rendered += "g%s" % pieces["short"]
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ rendered += "+g%s" % pieces["short"]
+ return rendered
+def render_pep440_old(pieces):
+ """TAG[.postDISTANCE[.dev0]] .
+ The ".dev0" means dirty.
+ Eexceptions:
+ 1: no tags. 0.postDISTANCE[.dev0]
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"] or pieces["dirty"]:
+ rendered += ".post%d" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ else:
+ # exception #1
+ rendered = "" % pieces["distance"]
+ if pieces["dirty"]:
+ rendered += ".dev0"
+ return rendered
+def render_git_describe(pieces):
+ """TAG[-DISTANCE-gHEX][-dirty].
+ Like 'git describe --tags --dirty --always'.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ if pieces["distance"]:
+ rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render_git_describe_long(pieces):
+ """TAG-DISTANCE-gHEX[-dirty].
+ Like 'git describe --tags --dirty --always -long'.
+ The distance/hash is unconditional.
+ Exceptions:
+ 1: no tags. HEX[-dirty] (note: no 'g' prefix)
+ """
+ if pieces["closest-tag"]:
+ rendered = pieces["closest-tag"]
+ rendered += "-%d-g%s" % (pieces["distance"], pieces["short"])
+ else:
+ # exception #1
+ rendered = pieces["short"]
+ if pieces["dirty"]:
+ rendered += "-dirty"
+ return rendered
+def render(pieces, style):
+ """Render the given version pieces into the requested style."""
+ if pieces["error"]:
+ return {"version": "unknown",
+ "full-revisionid": pieces.get("long"),
+ "dirty": None,
+ "error": pieces["error"]}
+ if not style or style == "default":
+ style = "pep440" # the default
+ if style == "pep440":
+ rendered = render_pep440(pieces)
+ elif style == "pep440-pre":
+ rendered = render_pep440_pre(pieces)
+ elif style == "pep440-post":
+ rendered = render_pep440_post(pieces)
+ elif style == "pep440-old":
+ rendered = render_pep440_old(pieces)
+ elif style == "git-describe":
+ rendered = render_git_describe(pieces)
+ elif style == "git-describe-long":
+ rendered = render_git_describe_long(pieces)
+ else:
+ raise ValueError("unknown style '%s'" % style)
+ return {"version": rendered, "full-revisionid": pieces["long"],
+ "dirty": pieces["dirty"], "error": None}
+class VersioneerBadRootError(Exception):
+ """The project root directory is unknown or missing key files."""
+def get_versions(verbose=False):
+ """Get the project version from whatever source is available.
+ Returns dict with two keys: 'version' and 'full'.
+ """
+ if "versioneer" in sys.modules:
+ # see the discussion in
+ del sys.modules["versioneer"]
+ root = get_root()
+ cfg = get_config_from_root(root)
+ assert cfg.VCS is not None, "please set [versioneer]VCS= in setup.cfg"
+ handlers = HANDLERS.get(cfg.VCS)
+ assert handlers, "unrecognized VCS '%s'" % cfg.VCS
+ verbose = verbose or cfg.verbose
+ assert cfg.versionfile_source is not None, \
+ "please set versioneer.versionfile_source"
+ assert cfg.tag_prefix is not None, "please set versioneer.tag_prefix"
+ versionfile_abs = os.path.join(root, cfg.versionfile_source)
+ # extract version from first of:, VCS command (e.g. 'git
+ # describe'), parentdir. This is meant to work for developers using a
+ # source checkout, for users of a tarball created by ' sdist',
+ # and for users of a tarball/zipball created by 'git archive' or github's
+ # download-from-tag feature or the equivalent in other VCSes.
+ get_keywords_f = handlers.get("get_keywords")
+ from_keywords_f = handlers.get("keywords")
+ if get_keywords_f and from_keywords_f:
+ try:
+ keywords = get_keywords_f(versionfile_abs)
+ ver = from_keywords_f(keywords, cfg.tag_prefix, verbose)
+ if verbose:
+ print("got version from expanded keyword %s" % ver)
+ return ver
+ except NotThisMethod:
+ pass
+ try:
+ ver = versions_from_file(versionfile_abs)
+ if verbose:
+ print("got version from file %s %s" % (versionfile_abs, ver))
+ return ver
+ except NotThisMethod:
+ pass
+ from_vcs_f = handlers.get("pieces_from_vcs")
+ if from_vcs_f:
+ try:
+ pieces = from_vcs_f(cfg.tag_prefix, root, verbose)
+ ver = render(pieces,
+ if verbose:
+ print("got version from VCS %s" % ver)
+ return ver
+ except NotThisMethod:
+ pass
+ try:
+ if cfg.parentdir_prefix:
+ ver = versions_from_parentdir(cfg.parentdir_prefix, root, verbose)
+ if verbose:
+ print("got version from parentdir %s" % ver)
+ return ver
+ except NotThisMethod:
+ pass
+ if verbose:
+ print("unable to compute version")
+ return {"version": "0+unknown", "full-revisionid": None,
+ "dirty": None, "error": "unable to compute version"}
+def get_version():
+ """Get the short version string for this project."""
+ return get_versions()["version"]
+def get_cmdclass():
+ """Get the custom setuptools/distutils subclasses used by Versioneer."""
+ if "versioneer" in sys.modules:
+ del sys.modules["versioneer"]
+ # this fixes the "python develop" case (also 'install' and
+ # 'easy_install .'), in which subdependencies of the main project are
+ # built (using bdist_egg) in the same python process. Assume
+ # a main project A and a dependency B, which use different versions
+ # of Versioneer. A's imports A's Versioneer, leaving it in
+ # sys.modules by the time B's is executed, causing B to run
+ # with the wrong versioneer. Setuptools wraps the sub-dep builds in a
+ # sandbox that restores sys.modules to it's pre-build state, so the
+ # parent is protected against the child's "import versioneer". By
+ # removing ourselves from sys.modules here, before the child build
+ # happens, we protect the child from the parent's versioneer too.
+ # Also see
+ cmds = {}
+ # we add "version" to both distutils and setuptools
+ from distutils.core import Command
+ class cmd_version(Command):
+ description = "report generated version string"
+ user_options = []
+ boolean_options = []
+ def initialize_options(self):
+ pass
+ def finalize_options(self):
+ pass
+ def run(self):
+ vers = get_versions(verbose=True)
+ print("Version: %s" % vers["version"])
+ print(" full-revisionid: %s" % vers.get("full-revisionid"))
+ print(" dirty: %s" % vers.get("dirty"))
+ if vers["error"]:
+ print(" error: %s" % vers["error"])
+ cmds["version"] = cmd_version
+ # we override "build_py" in both distutils and setuptools
+ #
+ # most invocation pathways end up running build_py:
+ # distutils/build -> build_py
+ # distutils/install -> distutils/build ->..
+ # setuptools/bdist_wheel -> distutils/install ->..
+ # setuptools/bdist_egg -> distutils/install_lib -> build_py
+ # setuptools/install -> bdist_egg ->..
+ # setuptools/develop -> ?
+ # we override different "build_py" commands for both environments
+ if "setuptools" in sys.modules:
+ from setuptools.command.build_py import build_py as _build_py
+ else:
+ from distutils.command.build_py import build_py as _build_py
+ class cmd_build_py(_build_py):
+ def run(self):
+ root = get_root()
+ cfg = get_config_from_root(root)
+ versions = get_versions()
+ # now locate in the new build/ directory and replace
+ # it with an updated value
+ if cfg.versionfile_build:
+ target_versionfile = os.path.join(self.build_lib,
+ cfg.versionfile_build)
+ print("UPDATING %s" % target_versionfile)
+ write_to_version_file(target_versionfile, versions)
+ cmds["build_py"] = cmd_build_py
+ if "cx_Freeze" in sys.modules: # cx_freeze enabled?
+ from cx_Freeze.dist import build_exe as _build_exe
+ class cmd_build_exe(_build_exe):
+ def run(self):
+ root = get_root()
+ cfg = get_config_from_root(root)
+ versions = get_versions()
+ target_versionfile = cfg.versionfile_source
+ print("UPDATING %s" % target_versionfile)
+ write_to_version_file(target_versionfile, versions)
+ os.unlink(target_versionfile)
+ with open(cfg.versionfile_source, "w") as f:
+ f.write(LONG %
+ {"DOLLAR": "$",
+ "STYLE":,
+ "TAG_PREFIX": cfg.tag_prefix,
+ "PARENTDIR_PREFIX": cfg.parentdir_prefix,
+ "VERSIONFILE_SOURCE": cfg.versionfile_source,
+ })
+ cmds["build_exe"] = cmd_build_exe
+ del cmds["build_py"]
+ # we override different "sdist" commands for both environments
+ if "setuptools" in sys.modules:
+ from setuptools.command.sdist import sdist as _sdist
+ else:
+ from distutils.command.sdist import sdist as _sdist
+ class cmd_sdist(_sdist):
+ def run(self):
+ versions = get_versions()
+ self._versioneer_generated_versions = versions
+ # unless we update this, the command will keep using the old
+ # version
+ self.distribution.metadata.version = versions["version"]
+ return
+ def make_release_tree(self, base_dir, files):
+ root = get_root()
+ cfg = get_config_from_root(root)
+ _sdist.make_release_tree(self, base_dir, files)
+ # now locate in the new base_dir directory
+ # (remembering that it may be a hardlink) and replace it with an
+ # updated value
+ target_versionfile = os.path.join(base_dir, cfg.versionfile_source)
+ print("UPDATING %s" % target_versionfile)
+ write_to_version_file(target_versionfile,
+ self._versioneer_generated_versions)
+ cmds["sdist"] = cmd_sdist
+ return cmds
+setup.cfg is missing the necessary Versioneer configuration. You need
+a section like:
+ [versioneer]
+ VCS = git
+ style = pep440
+ versionfile_source = src/myproject/
+ versionfile_build = myproject/
+ tag_prefix =
+ parentdir_prefix = myproject-
+You will also need to edit your to use the results:
+ import versioneer
+ setup(version=versioneer.get_version(),
+ cmdclass=versioneer.get_cmdclass(), ...)
+Please read the docstring in ./ for configuration instructions,
+edit setup.cfg, and re-run the installer or 'python setup'.
+# See the docstring in for instructions. Note that you must
+# re-run ' setup' after changing this section, and commit the
+# resulting files.
+#VCS = git
+#style = pep440
+#versionfile_source =
+#versionfile_build =
+#tag_prefix =
+#parentdir_prefix =
+from ._version import get_versions
+__version__ = get_versions()['version']
+del get_versions
+def do_setup():
+ """Main VCS-independent setup function for installing Versioneer."""
+ root = get_root()
+ try:
+ cfg = get_config_from_root(root)
+ except (EnvironmentError, configparser.NoSectionError,
+ configparser.NoOptionError) as e:
+ if isinstance(e, (EnvironmentError, configparser.NoSectionError)):
+ print("Adding sample versioneer config to setup.cfg",
+ file=sys.stderr)
+ with open(os.path.join(root, "setup.cfg"), "a") as f:
+ f.write(SAMPLE_CONFIG)
+ print(CONFIG_ERROR, file=sys.stderr)
+ return 1
+ print(" creating %s" % cfg.versionfile_source)
+ with open(cfg.versionfile_source, "w") as f:
+ f.write(LONG % {"DOLLAR": "$",
+ "STYLE":,
+ "TAG_PREFIX": cfg.tag_prefix,
+ "PARENTDIR_PREFIX": cfg.parentdir_prefix,
+ "VERSIONFILE_SOURCE": cfg.versionfile_source,
+ })
+ ipy = os.path.join(os.path.dirname(cfg.versionfile_source),
+ "")
+ if os.path.exists(ipy):
+ try:
+ with open(ipy, "r") as f:
+ old =
+ except EnvironmentError:
+ old = ""
+ if INIT_PY_SNIPPET not in old:
+ print(" appending to %s" % ipy)
+ with open(ipy, "a") as f:
+ f.write(INIT_PY_SNIPPET)
+ else:
+ print(" %s unmodified" % ipy)
+ else:
+ print(" %s doesn't exist, ok" % ipy)
+ ipy = None
+ # Make sure both the top-level "" and versionfile_source
+ # (PKG/, used by runtime code) are in, so
+ # they'll be copied into source distributions. Pip won't be able to
+ # install the package without this.
+ manifest_in = os.path.join(root, "")
+ simple_includes = set()
+ try:
+ with open(manifest_in, "r") as f:
+ for line in f:
+ if line.startswith("include "):
+ for include in line.split()[1:]:
+ simple_includes.add(include)
+ except EnvironmentError:
+ pass
+ # That doesn't cover everything can do
+ # (, so
+ # it might give some false negatives. Appending redundant 'include'
+ # lines is safe, though.
+ if "" not in simple_includes:
+ print(" appending '' to")
+ with open(manifest_in, "a") as f:
+ f.write("include\n")
+ else:
+ print(" '' already in")
+ if cfg.versionfile_source not in simple_includes:
+ print(" appending versionfile_source ('%s') to" %
+ cfg.versionfile_source)
+ with open(manifest_in, "a") as f:
+ f.write("include %s\n" % cfg.versionfile_source)
+ else:
+ print(" versionfile_source already in")
+ # Make VCS-specific changes. For git, this means creating/changing
+ # .gitattributes to mark for export-time keyword
+ # substitution.
+ do_vcs_install(manifest_in, cfg.versionfile_source, ipy)
+ return 0
+def scan_setup_py():
+ """Validate the contents of against Versioneer's expectations."""
+ found = set()
+ setters = False
+ errors = 0
+ with open("", "r") as f:
+ for line in f.readlines():
+ if "import versioneer" in line:
+ found.add("import")
+ if "versioneer.get_cmdclass()" in line:
+ found.add("cmdclass")
+ if "versioneer.get_version()" in line:
+ found.add("get_version")
+ if "versioneer.VCS" in line:
+ setters = True
+ if "versioneer.versionfile_source" in line:
+ setters = True
+ if len(found) != 3:
+ print("")
+ print("Your appears to be missing some important items")
+ print("(but I might be wrong). Please make sure it has something")
+ print("roughly like the following:")
+ print("")
+ print(" import versioneer")
+ print(" setup( version=versioneer.get_version(),")
+ print(" cmdclass=versioneer.get_cmdclass(), ...)")
+ print("")
+ errors += 1
+ if setters:
+ print("You should remove lines like 'versioneer.VCS = ' and")
+ print("'versioneer.versionfile_source = ' . This configuration")
+ print("now lives in setup.cfg, and should be removed from")
+ print("")
+ errors += 1
+ return errors
+if __name__ == "__main__":
+ cmd = sys.argv[1]
+ if cmd == "setup":
+ errors = do_setup()
+ errors += scan_setup_py()
+ if errors:
+ sys.exit(1)