diff options
41 files changed, 7282 insertions, 0 deletions
diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/README.rst diff --git a/docs/keymanager-soledad-docs.rst b/docs/keymanager-soledad-docs.rst new file mode 100644 index 0000000..67055b2 --- /dev/null +++ b/docs/keymanager-soledad-docs.rst @@ -0,0 +1,77 @@ +================= +Soledad Documents +================= + +KeyManager uses two types of documents for the keyring: + +* key document, that stores each gpg key. + +* active document, that relates an address to its corresponding key. + + +Each key can have 0 or more active documents with a different email address +each: + +:: + + .-------------. .-------------. + | foo@foo.com | | bar@bar.com | + '-------------' '-------------' + | | + | .-----------. | + | | | | + | | key | | + '----->| |<----' + | | + '-----------' + + +Fields in a key document: + +* uids + +* fingerprint + +* key_data + +* private. bool marking if the key is private or public + +* length + +* expiry_date + +* refreshed_at + +* version = 1 + +* type = "OpenPGPKey" + +* tags = ["keymanager-key"] + + +Fields in an active document: + +* address + +* fingerprint + +* private + +* validation + +* last_audited_at + +* encr_used + +* sign_used + +* version = 1 + +* type = "OpenPGPKey-active" + +* tags = ["keymanager-active"] + + +The meaning of validation, encr_used and sign_used is related to the `Transitional Key Validation`_ + +.. _Transitional Key Validation: https://leap.se/en/docs/design/transitional-key-validation diff --git a/gui/README.rst b/gui/README.rst new file mode 100644 index 0000000..142d188 --- /dev/null +++ b/gui/README.rst @@ -0,0 +1 @@ +[elijah is working on this] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/setup.py diff --git a/src/leap/__init__.py b/src/leap/__init__.py new file mode 100644 index 0000000..f48ad10 --- /dev/null +++ b/src/leap/__init__.py @@ -0,0 +1,6 @@ +# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages +try: + __import__('pkg_resources').declare_namespace(__name__) +except ImportError: + from pkgutil import extend_path + __path__ = extend_path(__path__, __name__) diff --git a/src/leap/bitmask/__init__.py b/src/leap/bitmask/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/__init__.py diff --git a/src/leap/bitmask/bonafide/__init__.py b/src/leap/bitmask/bonafide/__init__.py new file mode 100644 index 0000000..74f4e66 --- /dev/null +++ b/src/leap/bitmask/bonafide/__init__.py @@ -0,0 +1,4 @@ + +from ._version import get_versions +__version__ = get_versions()['version'] +del get_versions diff --git a/src/leap/bitmask/bonafide/_http.py b/src/leap/bitmask/bonafide/_http.py new file mode 100644 index 0000000..8f05b42 --- /dev/null +++ b/src/leap/bitmask/bonafide/_http.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# _http.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +twisted.web utils for bonafide. +""" +import base64 +import cookielib +import urllib + +from twisted.internet import defer, protocol, reactor +from twisted.internet.ssl import Certificate +from twisted.python.filepath import FilePath +from twisted.web.client import Agent, CookieAgent +from twisted.web.client import BrowserLikePolicyForHTTPS +from twisted.web.http_headers import Headers +from twisted.web.iweb import IBodyProducer +from zope.interface import implements + + +def cookieAgentFactory(verify_path, connectTimeout=30): + customPolicy = BrowserLikePolicyForHTTPS( + Certificate.loadPEM(FilePath(verify_path).getContent())) + agent = Agent(reactor, customPolicy, connectTimeout=connectTimeout) + cookiejar = cookielib.CookieJar() + return CookieAgent(agent, cookiejar) + + +def httpRequest(agent, url, values={}, headers={}, method='POST', token=None): + data = '' + if values: + data = urllib.urlencode(values) + headers['Content-Type'] = ['application/x-www-form-urlencoded'] + + if token: + headers['Authorization'] = ['Token token="%s"' % (bytes(token))] + + def handle_response(response): + # print "RESPONSE CODE", response.code + if response.code == 204: + d = defer.succeed('') + else: + class SimpleReceiver(protocol.Protocol): + def __init__(s, d): + s.buf = '' + s.d = d + + def dataReceived(s, data): + s.buf += data + + def connectionLost(s, reason): + # TODO: test if reason is twisted.web.client.ResponseDone, + # if not, do an errback + s.d.callback(s.buf) + d = defer.Deferred() + response.deliverBody(SimpleReceiver(d)) + return d + + d = agent.request(method, url, Headers(headers), + StringProducer(data) if data else None) + d.addCallback(handle_response) + return d + + +class StringProducer(object): + + implements(IBodyProducer) + + def __init__(self, body): + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass diff --git a/src/leap/bitmask/bonafide/_protocol.py b/src/leap/bitmask/bonafide/_protocol.py new file mode 100644 index 0000000..726185e --- /dev/null +++ b/src/leap/bitmask/bonafide/_protocol.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# _protocol.py +# Copyright (C) 2014-2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Bonafide protocol. +""" +import os +import resource +from collections import defaultdict + +from leap.bonafide import config +from leap.bonafide.provider import Api +from leap.bonafide.session import Session, OK +from leap.common.config import get_path_prefix + +from twisted.cred.credentials import UsernamePassword +from twisted.internet.defer import fail +from twisted.python import log + + +# TODO [ ] enable-disable services +# TODO [ ] read provider info + +COMMANDS = 'signup', 'authenticate', 'logout', 'stats' +_preffix = get_path_prefix() + + +class BonafideProtocol(object): + """ + Expose the protocol that interacts with the Bonafide Service API. + """ + + _apis = defaultdict(None) + _sessions = defaultdict(None) + + def _get_api(self, provider): + # TODO should get deferred + if provider.domain in self._apis: + return self._apis[provider.domain] + + # TODO defer the autoconfig for the provider if needed... + api = Api(provider.api_uri, provider.version) + self._apis[provider.domain] = api + return api + + def _get_session(self, provider, full_id, password=""): + if full_id in self._sessions: + return self._sessions[full_id] + + # TODO if password/username null, then pass AnonymousCreds + # TODO use twisted.cred instead + username, provider_id = config.get_username_and_provider(full_id) + credentials = UsernamePassword(username, password) + api = self._get_api(provider) + provider_pem = _get_provider_ca_path(provider_id) + session = Session(credentials, api, provider_pem) + self._sessions[full_id] = session + return session + + def _del_session_errback(self, failure, full_id): + if full_id in self._sessions: + del self._sessions[full_id] + return failure + + # Service public methods + + def do_signup(self, full_id, password): + log.msg('SIGNUP for %s' % full_id) + _, provider_id = config.get_username_and_provider(full_id) + + provider = config.Provider(provider_id) + d = provider.callWhenReady( + self._do_signup, provider, full_id, password) + return d + + def _do_signup(self, provider, full_id, password): + + # XXX check it's unauthenticated + def return_user(result, _session): + return_code, user = result + if return_code == OK: + return user + + username, _ = config.get_username_and_provider(full_id) + # XXX get deferred? + session = self._get_session(provider, full_id, password) + d = session.signup(username, password) + d.addCallback(return_user, session) + d.addErrback(self._del_session_errback, full_id) + return d + + def do_authenticate(self, full_id, password): + _, provider_id = config.get_username_and_provider(full_id) + + provider = config.Provider(provider_id) + + def maybe_finish_provider_bootstrap(result, provider): + session = self._get_session(provider, full_id, password) + d = provider.download_services_config_with_auth(session) + d.addCallback(lambda _: result) + return d + + d = provider.callWhenReady( + self._do_authenticate, provider, full_id, password) + d.addCallback(maybe_finish_provider_bootstrap, provider) + return d + + def _do_authenticate(self, provider, full_id, password): + + def return_token_and_uuid(result, _session): + if result == OK: + # TODO -- turn this into JSON response + return str(_session.token), str(_session.uuid) + + log.msg('AUTH for %s' % full_id) + + # XXX get deferred? + session = self._get_session(provider, full_id, password) + d = session.authenticate() + d.addCallback(return_token_and_uuid, session) + d.addErrback(self._del_session_errback, full_id) + return d + + def do_logout(self, full_id): + # XXX use the AVATAR here + log.msg('LOGOUT for %s' % full_id) + if (full_id not in self._sessions or + not self._sessions[full_id].is_authenticated): + return fail(RuntimeError("There is no session for such user")) + session = self._sessions[full_id] + + d = session.logout() + d.addCallback(lambda _: self._sessions.pop(full_id)) + d.addCallback(lambda _: '%s logged out' % full_id) + return d + + def do_get_smtp_cert(self, full_id): + if (full_id not in self._sessions or + not self._sessions[full_id].is_authenticated): + return fail(RuntimeError("There is no session for such user")) + d = self._sessions[full_id].get_smtp_cert() + return d + + def do_get_vpn_cert(self): + # FIXME to be implemented + pass + + def do_update_user(self): + # FIXME to be implemented + pass + + def do_stats(self): + log.msg('Calculating Bonafide Service STATS') + mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + return {'sessions': len(self._sessions), + 'mem': '%s KB' % (mem / 1024)} + + +def _get_provider_ca_path(provider_id): + return os.path.join( + _preffix, 'leap', 'providers', provider_id, 'keys', 'ca', 'cacert.pem') diff --git a/src/leap/bitmask/bonafide/_srp.py b/src/leap/bitmask/bonafide/_srp.py new file mode 100644 index 0000000..38f657b --- /dev/null +++ b/src/leap/bitmask/bonafide/_srp.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# _srp.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +SRP Authentication. +""" + +import binascii +import json + +import srp + + +class SRPAuthMechanism(object): + + """ + Implement a protocol-agnostic SRP Authentication mechanism. + """ + + def __init__(self, username, password): + self.username = username + self.srp_user = srp.User(username, password, + srp.SHA256, srp.NG_1024) + _, A = self.srp_user.start_authentication() + self.A = A + self.M = None + self.M2 = None + + def get_handshake_params(self): + return {'login': bytes(self.username), + 'A': binascii.hexlify(self.A)} + + def process_handshake(self, handshake_response): + challenge = json.loads(handshake_response) + self._check_for_errors(challenge) + salt = challenge.get('salt', None) + B = challenge.get('B', None) + unhex_salt, unhex_B = self._unhex_salt_B(salt, B) + self.M = self.srp_user.process_challenge(unhex_salt, unhex_B) + + def get_authentication_params(self): + # It looks A is not used server side + return {'client_auth': binascii.hexlify(self.M), + 'A': binascii.hexlify(self.A)} + + def process_authentication(self, authentication_response): + auth = json.loads(authentication_response) + self._check_for_errors(auth) + uuid = auth.get('id', None) + token = auth.get('token', None) + self.M2 = auth.get('M2', None) + self._check_auth_params(uuid, token, self.M2) + return uuid, token + + def verify_authentication(self): + unhex_M2 = _safe_unhexlify(self.M2) + self.srp_user.verify_session(unhex_M2) + assert self.srp_user.authenticated() + + def _check_for_errors(self, response): + if 'errors' in response: + msg = response['errors']['base'] + raise SRPAuthError(unicode(msg).encode('utf-8')) + + def _unhex_salt_B(self, salt, B): + if salt is None: + raise SRPAuthNoSalt() + if B is None: + raise SRPAuthNoB() + try: + unhex_salt = _safe_unhexlify(salt) + unhex_B = _safe_unhexlify(B) + except (TypeError, ValueError) as e: + raise SRPAuthBadDataFromServer(str(e)) + return unhex_salt, unhex_B + + def _check_auth_params(self, uuid, token, M2): + if not all((uuid, token, M2)): + msg = '%s' % str((M2, uuid, token)) + raise SRPAuthBadDataFromServer(msg) + + +class SRPSignupMechanism(object): + + """ + Implement a protocol-agnostic SRP Registration mechanism. + """ + + def get_signup_params(self, username, password): + salt, verifier = srp.create_salted_verification_key( + bytes(username), bytes(password), + srp.SHA256, srp.NG_1024) + user_data = { + 'user[login]': username, + 'user[password_salt]': binascii.hexlify(salt), + 'user[password_verifier]': binascii.hexlify(verifier)} + return user_data + + def process_signup(self, signup_response): + signup = json.loads(signup_response) + errors = signup.get('errors') + if errors: + msg = 'username ' + errors.get('login')[0] + raise SRPRegistrationError(msg) + else: + username = signup.get('login') + return username + + +def _safe_unhexlify(val): + return binascii.unhexlify(val) \ + if (len(val) % 2 == 0) else binascii.unhexlify('0' + val) + + +class SRPAuthError(Exception): + """ + Base exception for srp authentication errors + """ + + +class SRPAuthNoSalt(SRPAuthError): + message = 'The server didn\'t send the salt parameter' + + +class SRPAuthNoB(SRPAuthError): + message = 'The server didn\'t send the B parameter' + + +class SRPAuthBadDataFromServer(SRPAuthError): + pass + +class SRPRegistrationError(Exception): + pass diff --git a/src/leap/bitmask/bonafide/_version.py b/src/leap/bitmask/bonafide/_version.py new file mode 100644 index 0000000..91fb65c --- /dev/null +++ b/src/leap/bitmask/bonafide/_version.py @@ -0,0 +1,460 @@ + +# This file helps to compute a version number in source trees obtained from +# git-archive tarball (such as those provided by githubs download-from-tag +# feature). Distribution tarballs (built by setup.py sdist) and build +# directories (produced by setup.py build) will contain a much shorter file +# that just contains the computed version number. + +# This file is released into the public domain. Generated by +# versioneer-0.15 (https://github.com/warner/python-versioneer) + +import errno +import os +import re +import subprocess +import sys + + +def get_keywords(): + # these strings will be replaced by git during git-archive. + # setup.py/versioneer.py will grep for the variable names, so they must + # each be defined on a line of their own. _version.py will just call + # get_keywords(). + git_refnames = "$Format:%d$" + git_full = "$Format:%H$" + keywords = {"refnames": git_refnames, "full": git_full} + return keywords + + +class VersioneerConfig: + pass + + +def get_config(): + # these strings are filled in when 'setup.py versioneer' creates + # _version.py + cfg = VersioneerConfig() + cfg.VCS = "git" + cfg.style = "pep440" + cfg.tag_prefix = "None" + cfg.parentdir_prefix = "None" + cfg.versionfile_source = "src/leap/bonafide/_version.py" + cfg.verbose = False + return cfg + + +class NotThisMethod(Exception): + pass + + +LONG_VERSION_PY = {} +HANDLERS = {} + + +def register_vcs_handler(vcs, method): # decorator + def decorate(f): + if vcs not in HANDLERS: + HANDLERS[vcs] = {} + HANDLERS[vcs][method] = f + return f + return decorate + + +def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False): + assert isinstance(commands, list) + p = None + for c in commands: + try: + dispcmd = str([c] + args) + # remember shell=False, so use git.cmd on windows, not just git + p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE, + stderr=(subprocess.PIPE if hide_stderr + else None)) + break + except EnvironmentError: + e = sys.exc_info()[1] + if e.errno == errno.ENOENT: + continue + if verbose: + print("unable to run %s" % dispcmd) + print(e) + return None + else: + if verbose: + print("unable to find command, tried %s" % (commands,)) + return None + stdout = p.communicate()[0].strip() + if sys.version_info[0] >= 3: + stdout = stdout.decode() + if p.returncode != 0: + if verbose: + print("unable to run %s (error)" % dispcmd) + return None + return stdout + + +def versions_from_parentdir(parentdir_prefix, root, verbose): + # Source tarballs conventionally unpack into a directory that includes + # both the project name and a version string. + dirname = os.path.basename(root) + if not dirname.startswith(parentdir_prefix): + if verbose: + print("guessing rootdir is '%s', but '%s' doesn't start with " + "prefix '%s'" % (root, dirname, parentdir_prefix)) + raise NotThisMethod("rootdir doesn't start with parentdir_prefix") + return {"version": dirname[len(parentdir_prefix):], + "full-revisionid": None, + "dirty": False, "error": None} + + +@register_vcs_handler("git", "get_keywords") +def git_get_keywords(versionfile_abs): + # the code embedded in _version.py can just fetch the value of these + # keywords. When used from setup.py, we don't want to import _version.py, + # so we do it with a regexp instead. This function is not used from + # _version.py. + keywords = {} + try: + f = open(versionfile_abs, "r") + for line in f.readlines(): + if line.strip().startswith("git_refnames ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["refnames"] = mo.group(1) + if line.strip().startswith("git_full ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["full"] = mo.group(1) + f.close() + except EnvironmentError: + pass + return keywords + + +@register_vcs_handler("git", "keywords") +def git_versions_from_keywords(keywords, tag_prefix, verbose): + if not keywords: + raise NotThisMethod("no keywords at all, weird") + refnames = keywords["refnames"].strip() + if refnames.startswith("$Format"): + if verbose: + print("keywords are unexpanded, not using") + raise NotThisMethod("unexpanded keywords, not a git-archive tarball") + refs = set([r.strip() for r in refnames.strip("()").split(",")]) + # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of + # just "foo-1.0". If we see a "tag: " prefix, prefer those. + TAG = "tag: " + tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)]) + if not tags: + # Either we're using git < 1.8.3, or there really are no tags. We use + # a heuristic: assume all version tags have a digit. The old git %d + # expansion behaves like git log --decorate=short and strips out the + # refs/heads/ and refs/tags/ prefixes that would let us distinguish + # between branches and tags. By ignoring refnames without digits, we + # filter out many common branch names like "release" and + # "stabilization", as well as "HEAD" and "master". + tags = set([r for r in refs if re.search(r'\d', r)]) + if verbose: + print("discarding '%s', no digits" % ",".join(refs-tags)) + if verbose: + print("likely tags: %s" % ",".join(sorted(tags))) + for ref in sorted(tags): + # sorting will prefer e.g. "2.0" over "2.0rc1" + if ref.startswith(tag_prefix): + r = ref[len(tag_prefix):] + if verbose: + print("picking %s" % r) + return {"version": r, + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": None + } + # no suitable tags, so version is "0+unknown", but full hex is still there + if verbose: + print("no suitable tags, using unknown + full revision id") + return {"version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": "no suitable tags"} + + +@register_vcs_handler("git", "pieces_from_vcs") +def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): + # this runs 'git' from the root of the source tree. This only gets called + # if the git-archive 'subst' keywords were *not* expanded, and + # _version.py hasn't already been rewritten with a short version string, + # meaning we're inside a checked out source tree. + + if not os.path.exists(os.path.join(root, ".git")): + if verbose: + print("no .git in %s" % root) + raise NotThisMethod("no .git directory") + + GITS = ["git"] + if sys.platform == "win32": + GITS = ["git.cmd", "git.exe"] + # if there is a tag, this yields TAG-NUM-gHEX[-dirty] + # if there are no tags, this yields HEX[-dirty] (no NUM) + describe_out = run_command(GITS, ["describe", "--tags", "--dirty", + "--always", "--long"], + cwd=root) + # --long was added in git-1.5.5 + if describe_out is None: + raise NotThisMethod("'git describe' failed") + describe_out = describe_out.strip() + full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root) + if full_out is None: + raise NotThisMethod("'git rev-parse' failed") + full_out = full_out.strip() + + pieces = {} + pieces["long"] = full_out + pieces["short"] = full_out[:7] # maybe improved later + pieces["error"] = None + + # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty] + # TAG might have hyphens. + git_describe = describe_out + + # look for -dirty suffix + dirty = git_describe.endswith("-dirty") + pieces["dirty"] = dirty + if dirty: + git_describe = git_describe[:git_describe.rindex("-dirty")] + + # now we have TAG-NUM-gHEX or HEX + + if "-" in git_describe: + # TAG-NUM-gHEX + mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe) + if not mo: + # unparseable. Maybe git-describe is misbehaving? + pieces["error"] = ("unable to parse git-describe output: '%s'" + % describe_out) + return pieces + + # tag + full_tag = mo.group(1) + if not full_tag.startswith(tag_prefix): + if verbose: + fmt = "tag '%s' doesn't start with prefix '%s'" + print(fmt % (full_tag, tag_prefix)) + pieces["error"] = ("tag '%s' doesn't start with prefix '%s'" + % (full_tag, tag_prefix)) + return pieces + pieces["closest-tag"] = full_tag[len(tag_prefix):] + + # distance: number of commits since tag + pieces["distance"] = int(mo.group(2)) + + # commit: short hex revision ID + pieces["short"] = mo.group(3) + + else: + # HEX: no tags + pieces["closest-tag"] = None + count_out = run_command(GITS, ["rev-list", "HEAD", "--count"], + cwd=root) + pieces["distance"] = int(count_out) # total number of commits + + return pieces + + +def plus_or_dot(pieces): + if "+" in pieces.get("closest-tag", ""): + return "." + return "+" + + +def render_pep440(pieces): + # now build up version string, with post-release "local version + # identifier". Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you + # get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty + + # exceptions: + # 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty] + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += plus_or_dot(pieces) + rendered += "%d.g%s" % (pieces["distance"], pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0+untagged.%d.g%s" % (pieces["distance"], + pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def render_pep440_pre(pieces): + # TAG[.post.devDISTANCE] . No -dirty + + # exceptions: + # 1: no tags. 0.post.devDISTANCE + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += ".post.dev%d" % pieces["distance"] + else: + # exception #1 + rendered = "0.post.dev%d" % pieces["distance"] + return rendered + + +def render_pep440_post(pieces): + # TAG[.postDISTANCE[.dev0]+gHEX] . The ".dev0" means dirty. Note that + # .dev0 sorts backwards (a dirty tree will appear "older" than the + # corresponding clean one), but you shouldn't be releasing software with + # -dirty anyways. + + # exceptions: + # 1: no tags. 0.postDISTANCE[.dev0] + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "g%s" % pieces["short"] + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += "+g%s" % pieces["short"] + return rendered + + +def render_pep440_old(pieces): + # TAG[.postDISTANCE[.dev0]] . The ".dev0" means dirty. + + # exceptions: + # 1: no tags. 0.postDISTANCE[.dev0] + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + return rendered + + +def render_git_describe(pieces): + # TAG[-DISTANCE-gHEX][-dirty], like 'git describe --tags --dirty + # --always' + + # exceptions: + # 1: no tags. HEX[-dirty] (note: no 'g' prefix) + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render_git_describe_long(pieces): + # TAG-DISTANCE-gHEX[-dirty], like 'git describe --tags --dirty + # --always -long'. The distance/hash is unconditional. + + # exceptions: + # 1: no tags. HEX[-dirty] (note: no 'g' prefix) + + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render(pieces, style): + if pieces["error"]: + return {"version": "unknown", + "full-revisionid": pieces.get("long"), + "dirty": None, + "error": pieces["error"]} + + if not style or style == "default": + style = "pep440" # the default + + if style == "pep440": + rendered = render_pep440(pieces) + elif style == "pep440-pre": + rendered = render_pep440_pre(pieces) + elif style == "pep440-post": + rendered = render_pep440_post(pieces) + elif style == "pep440-old": + rendered = render_pep440_old(pieces) + elif style == "git-describe": + rendered = render_git_describe(pieces) + elif style == "git-describe-long": + rendered = render_git_describe_long(pieces) + else: + raise ValueError("unknown style '%s'" % style) + + return {"version": rendered, "full-revisionid": pieces["long"], + "dirty": pieces["dirty"], "error": None} + + +def get_versions(): + # I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have + # __file__, we can work backwards from there to the root. Some + # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which + # case we can only use expanded keywords. + + cfg = get_config() + verbose = cfg.verbose + + try: + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, + verbose) + except NotThisMethod: + pass + + try: + root = os.path.realpath(__file__) + # versionfile_source is the relative path from the top of the source + # tree (where the .git directory might live) to this file. Invert + # this to find the root from __file__. + for i in cfg.versionfile_source.split('/'): + root = os.path.dirname(root) + except NameError: + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to find root of source tree"} + + try: + pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose) + return render(pieces, cfg.style) + except NotThisMethod: + pass + + try: + if cfg.parentdir_prefix: + return versions_from_parentdir(cfg.parentdir_prefix, root, verbose) + except NotThisMethod: + pass + + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to compute version"} diff --git a/src/leap/bitmask/bonafide/bootstrap.py b/src/leap/bitmask/bonafide/bootstrap.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/bootstrap.py diff --git a/src/leap/bitmask/bonafide/config.py b/src/leap/bitmask/bonafide/config.py new file mode 100644 index 0000000..ae66a0e --- /dev/null +++ b/src/leap/bitmask/bonafide/config.py @@ -0,0 +1,508 @@ +# -*- coding: utf-8 -*- +# config.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Configuration for a LEAP provider. +""" +import datetime +import json +import os +import sys + +from collections import defaultdict +from urlparse import urlparse + +from twisted.internet import defer, reactor +from twisted.internet.ssl import ClientContextFactory +from twisted.python import log +from twisted.web.client import Agent, downloadPage + +from leap.bonafide._http import httpRequest +from leap.bonafide.provider import Discovery + +from leap.common.check import leap_assert +from leap.common.config import get_path_prefix as common_get_path_prefix +from leap.common.files import mkdir_p +# check_and_fix_urw_only, get_mtime + + +APPNAME = "bonafide" +ENDPOINT = "ipc:///tmp/%s.sock" % APPNAME + + +def get_path_prefix(standalone=False): + return common_get_path_prefix(standalone) + + +_preffix = get_path_prefix() + + +def get_provider_path(domain, config='provider.json'): + """ + Returns relative path for provider configs. + + :param domain: the domain to which this providerconfig belongs to. + :type domain: str + :returns: the path + :rtype: str + """ + # TODO sanitize domain + leap_assert(domain is not None, 'get_provider_path: We need a domain') + return os.path.join('providers', domain, config) + + +def get_ca_cert_path(domain): + # TODO sanitize domain + leap_assert(domain is not None, 'get_provider_path: We need a domain') + return os.path.join('providers', domain, 'keys', 'ca', 'cacert.pem') + + +def get_modification_ts(path): + """ + Gets modification time of a file. + + :param path: the path to get ts from + :type path: str + :returns: modification time + :rtype: datetime object + """ + ts = os.path.getmtime(path) + return datetime.datetime.fromtimestamp(ts) + + +def update_modification_ts(path): + """ + Sets modification time of a file to current time. + + :param path: the path to set ts to. + :type path: str + :returns: modification time + :rtype: datetime object + """ + os.utime(path, None) + return get_modification_ts(path) + + +def is_file(path): + """ + Returns True if the path exists and is a file. + """ + return os.path.isfile(path) + + +def is_empty_file(path): + """ + Returns True if the file at path is empty. + """ + return os.stat(path).st_size is 0 + + +def make_address(user, provider): + """ + Return a full identifier for an user, as a email-like + identifier. + + :param user: the username + :type user: basestring + :param provider: the provider domain + :type provider: basestring + """ + return '%s@%s' % (user, provider) + + +def get_username_and_provider(full_id): + return full_id.split('@') + + +class Provider(object): + # TODO add validation + + SERVICES_MAP = { + 'openvpn': ['eip'], + 'mx': ['soledad', 'smtp']} + + first_bootstrap = defaultdict(None) + ongoing_bootstrap = defaultdict(None) + stuck_bootstrap = defaultdict(None) + + def __init__(self, domain, autoconf=True, basedir=None, + check_certificate=True): + if not basedir: + basedir = os.path.join(_preffix, 'leap') + self._basedir = os.path.expanduser(basedir) + self._domain = domain + self._disco = Discovery('https://%s' % domain) + self._provider_config = None + + is_configured = self.is_configured() + if not is_configured: + check_certificate = False + + if check_certificate: + self.contextFactory = None + else: + # XXX we should do this only for the FIRST provider download. + # For the rest, we should pass the ca cert to the agent. + # That means that RIGHT AFTER DOWNLOADING provider_info, + # we should instantiate a new Agent... + self.contextFactory = WebClientContextFactory() + self._agent = Agent(reactor, self.contextFactory) + + self._load_provider_json() + + if not is_configured and autoconf: + log.msg('provider %s not configured: downloading files...' % + domain) + self.bootstrap() + else: + log.msg('Provider already initialized') + self.first_bootstrap[self._domain] = defer.succeed( + 'already_initialized') + self.ongoing_bootstrap[self._domain] = defer.succeed( + 'already_initialized') + + @property + def domain(self): + return self._domain + + @property + def api_uri(self): + if not self._provider_config: + return 'https://api.%s:4430' % self._domain + return self._provider_config.api_uri + + @property + def version(self): + if not self._provider_config: + return 1 + return int(self._provider_config.api_version) + + def is_configured(self): + provider_json = self._get_provider_json_path() + # XXX check if all the services are there + if not is_file(provider_json): + return False + if not is_file(self._get_ca_cert_path()): + return False + if not self.has_config_for_all_services(): + return False + return True + + def bootstrap(self): + domain = self._domain + log.msg("Bootstrapping provider %s" % domain) + ongoing = self.ongoing_bootstrap.get(domain) + if ongoing: + log.msg('already bootstrapping this provider...') + return + + self.first_bootstrap[self._domain] = defer.Deferred() + + def first_bootstrap_done(ignored): + try: + self.first_bootstrap[domain].callback('got config') + except defer.AlreadyCalledError: + pass + + d = self.maybe_download_provider_info() + d.addCallback(self.maybe_download_ca_cert) + d.addCallback(self.validate_ca_cert) + d.addCallback(first_bootstrap_done) + d.addCallback(self.maybe_download_services_config) + self.ongoing_bootstrap[domain] = d + + def callWhenMainConfigReady(self, cb, *args, **kw): + d = self.first_bootstrap[self._domain] + d.addCallback(lambda _: cb(*args, **kw)) + return d + + def callWhenReady(self, cb, *args, **kw): + d = self.ongoing_bootstrap[self._domain] + d.addCallback(lambda _: cb(*args, **kw)) + return d + + def has_valid_certificate(self): + pass + + def maybe_download_provider_info(self, replace=False): + """ + Download the provider.json info from the main domain. + This SHOULD only be used once with the DOMAIN url. + """ + # TODO handle pre-seeded providers? + # or let client handle that? We could move them to bonafide. + provider_json = self._get_provider_json_path() + if is_file(provider_json) and not replace: + return defer.succeed('provider_info_already_exists') + + folders, f = os.path.split(provider_json) + mkdir_p(folders) + + uri = self._disco.get_provider_info_uri() + met = self._disco.get_provider_info_method() + + d = downloadPage(uri, provider_json, method=met) + d.addCallback(lambda _: self._load_provider_json()) + d.addErrback(log.err) + return d + + def update_provider_info(self): + """ + Get more recent copy of provider.json from the api URL. + """ + pass + + def maybe_download_ca_cert(self, ignored): + """ + :rtype: deferred + """ + path = self._get_ca_cert_path() + if is_file(path): + return defer.succeed('ca_cert_path_already_exists') + + uri = self._get_ca_cert_uri() + mkdir_p(os.path.split(path)[0]) + d = downloadPage(uri, path) + d.addErrback(log.err) + return d + + def validate_ca_cert(self, ignored): + # TODO Need to verify fingerprint against the one in provider.json + expected = self._get_expected_ca_cert_fingerprint() + print "EXPECTED FINGERPRINT:", expected + + def _get_expected_ca_cert_fingerprint(self): + try: + fgp = self._provider_config.ca_cert_fingerprint + except AttributeError: + fgp = None + return fgp + + # Services config files + + def has_fetched_services_config(self): + return os.path.isfile(self._get_configs_path()) + + def maybe_download_services_config(self, ignored): + + # TODO --- currently, some providers (mail.bitmask.net) raise 401 + # UNAUTHENTICATED if we try to get the services + # See: # https://leap.se/code/issues/7906 + + def further_bootstrap_needs_auth(ignored): + log.err('cannot download services config yet, need auth') + pending_deferred = defer.Deferred() + self.stuck_bootstrap[self._domain] = pending_deferred + return pending_deferred + + uri, met, path = self._get_configs_download_params() + + d = downloadPage(uri, path, method=met) + d.addCallback(lambda _: self._load_provider_json()) + d.addCallback( + lambda _: self._get_config_for_all_services(session=None)) + d.addErrback(further_bootstrap_needs_auth) + return d + + def download_services_config_with_auth(self, session): + + def verify_provider_configs(ignored): + self._load_provider_configs() + return True + + def workaround_for_config_fetch(failure): + # FIXME --- configs.json raises 500, see #7914. + # This is a workaround until that's fixed. + log.err(failure) + log.msg( + "COULD NOT VERIFY CONFIGS.JSON, WORKAROUND: DIRECT DOWNLOAD") + + if 'mx' in self._provider_config.services: + soledad_uri = '/1/config/soledad-service.json' + smtp_uri = '/1/config/smtp-service.json' + base = self._disco.netloc + + fetch = self._fetch_provider_configs_unauthenticated + get_path = self._get_service_config_path + + d1 = fetch( + 'https://' + str(base + soledad_uri), get_path('soledad')) + d2 = fetch( + 'https://' + str(base + smtp_uri), get_path('smtp')) + d = defer.gatherResults([d1, d2]) + d.addCallback(lambda _: finish_stuck_after_workaround()) + return d + + def finish_stuck_after_workaround(): + stuck = self.stuck_bootstrap.get(self._domain, None) + if stuck: + stuck.callback('continue!') + + def complete_bootstrapping(ignored): + stuck = self.stuck_bootstrap.get(self._domain, None) + if stuck: + d = self._get_config_for_all_services(session) + d.addCallback(lambda _: stuck.callback('continue!')) + d.addErrback(log.err) + return d + + if not self.has_fetched_services_config(): + self._load_provider_json() + uri, met, path = self._get_configs_download_params() + d = session.fetch_provider_configs(uri, path) + d.addCallback(verify_provider_configs) + d.addCallback(complete_bootstrapping) + d.addErrback(workaround_for_config_fetch) + return d + else: + d = defer.succeed('already downloaded') + d.addCallback(complete_bootstrapping) + return d + + def _get_configs_download_params(self): + uri = self._disco.get_configs_uri() + met = self._disco.get_configs_method() + path = self._get_configs_path() + return uri, met, path + + def offers_service(self, service): + if service not in self.SERVICES_MAP.keys(): + raise RuntimeError('Unknown service: %s' % service) + return service in self._provider_config.services + + def is_service_enabled(self, service): + # TODO implement on some config file + return True + + def has_config_for_service(self, service): + has_file = os.path.isfile + path = self._get_service_config_path + smap = self.SERVICES_MAP + + result = all([has_file(path(subservice)) for + subservice in smap[service]]) + return result + + def has_config_for_all_services(self): + self._load_provider_json() + if not self._provider_config: + return False + all_services = self._provider_config.services + has_all = all( + [self.has_config_for_service(service) for service in + all_services]) + return has_all + + def _get_provider_json_path(self): + domain = self._domain.encode(sys.getfilesystemencoding()) + provider_json_path = os.path.join( + self._basedir, get_provider_path(domain, config='provider.json')) + return provider_json_path + + def _get_configs_path(self): + domain = self._domain.encode(sys.getfilesystemencoding()) + configs_path = os.path.join( + self._basedir, get_provider_path(domain, config='configs.json')) + return configs_path + + def _get_service_config_path(self, service): + domain = self._domain.encode(sys.getfilesystemencoding()) + configs_path = os.path.join( + self._basedir, get_provider_path( + domain, config='%s-service.json' % service)) + return configs_path + + def _get_ca_cert_path(self): + domain = self._domain.encode(sys.getfilesystemencoding()) + cert_path = os.path.join(self._basedir, get_ca_cert_path(domain)) + return cert_path + + def _get_ca_cert_uri(self): + try: + uri = self._provider_config.ca_cert_uri + uri = str(uri) + except Exception: + uri = None + return uri + + def _load_provider_json(self): + path = self._get_provider_json_path() + if not is_file(path): + log.msg("Cannot LOAD provider config path %s" % path) + return + + with open(path, 'r') as config: + self._provider_config = Record(**json.load(config)) + + api_uri = self._provider_config.api_uri + if api_uri: + parsed = urlparse(api_uri) + self._disco.netloc = parsed.netloc + + def _get_config_for_all_services(self, session): + services_dict = self._load_provider_configs() + configs_path = self._get_configs_path() + with open(configs_path) as jsonf: + services_dict = Record(**json.load(jsonf)).services + pending = [] + base = self._disco.get_base_uri() + for service in self._provider_config.services: + if service in self.SERVICES_MAP.keys(): + for subservice in self.SERVICES_MAP[service]: + uri = base + str(services_dict[subservice]) + path = self._get_service_config_path(subservice) + if session: + d = session.fetch_provider_configs(uri, path) + else: + d = self._fetch_provider_configs_unauthenticated( + uri, path) + pending.append(d) + return defer.gatherResults(pending) + + def _load_provider_configs(self): + configs_path = self._get_configs_path() + with open(configs_path) as jsonf: + services_dict = Record(**json.load(jsonf)).services + return services_dict + + def _fetch_provider_configs_unauthenticated(self, uri, path): + log.msg('Downloading config for %s...' % uri) + d = downloadPage(uri, path, method='GET') + return d + + def _http_request(self, *args, **kw): + # XXX pass if-modified-since header + return httpRequest(self._agent, *args, **kw) + + +class Record(object): + def __init__(self, **kw): + self.__dict__.update(kw) + + +class WebClientContextFactory(ClientContextFactory): + def getContext(self, hostname, port): + return ClientContextFactory.getContext(self) + + +if __name__ == '__main__': + + def print_done(): + print '>>> bootstrapping done!!!' + + provider = Provider('cdev.bitmask.net') + provider.callWhenReady(print_done) + reactor.run() diff --git a/src/leap/bitmask/bonafide/cred_srp.py b/src/leap/bitmask/bonafide/cred_srp.py new file mode 100644 index 0000000..9fcb97b --- /dev/null +++ b/src/leap/bitmask/bonafide/cred_srp.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# srp_cred.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Credential module for authenticating SRP requests against the LEAP platform. +""" + +# ----------------- DOC ------------------------------------------------------ +# See examples of cred modules: +# https://github.com/oubiwann-unsupported/txBrowserID/blob/master/browserid/checker.py +# http://stackoverflow.com/questions/19171686/book-twisted-network-programming-essentials-example-9-1-does-not-work +# ----------------- DOC ------------------------------------------------------ + +from zope.interface import implements, implementer, Interface, Attribute + +from twisted.cred import portal, credentials, error as credError +from twisted.cred.checkers import ICredentialsChecker +from twisted.internet import defer, reactor + +from leap.bonafide.session import Session + + +@implementer(ICredentialsChecker) +class SRPCredentialsChecker(object): + + # TODO need to decide if the credentials that we pass here are per provider + # or not. + # I think it's better to have the credentials passed with the full user_id, + # and here split user/provider. + # XXX then we need to check if the provider is properly configured, to get + # the right api info AND the needed certificates. + # XXX might need to initialize credential checker with a ProviderAPI + + credentialInterfaces = (credentials.IUsernamePassword,) + + def requestAvatarId(self, credentials): + # TODO If we are already authenticated, we should just + # return the session object, somehow. + # XXX If not authenticated (ie, no cached credentials?) + # we pass credentials to srpauth.authenticate method + # another srpauth class should interface with the blocking srpauth + # library, and chain all the calls needed to do the handshake. + # Therefore: + # should keep reference to the srpauth instances somewhere + # TODO If we want to return an anonymous user (useful for getting the + # anon-vpn cert), we should return an empty tuple from here. + + return defer.maybeDeferred(_get_leap_session(credentials)).addCallback( + self._check_srp_auth) + + def _check_srp_auth(session, username): + if session.is_authenticated: + # is ok! --- should add it to some global cache? + return defer.succeed(username) + else: + return defer.fail(credError.UnauthorizedLogin( + "Bad username/password combination")) + + +def _get_leap_session(credentials): + session = Session(credentials) + d = session.authenticate() + d.addCallback(lambda _: session) + return d + + +class ILeapUserAvatar(Interface): + + # TODO add attributes for username, uuid, token, session_id + + def logout(): + """ + Clean up per-login resource allocated to this avatar. + """ + + +@implementer(ILeapUserAvatar) +class LeapUserAvatar(object): + + # TODO initialize with: username, uuid, token, session_id + # TODO initialize provider data (for api) + # TODO how does this relate to LeapSession? maybe we should get one passed? + + def logout(self): + + # TODO reset the (global?) srpauth object. + # https://leap.se/en/docs/design/bonafide#logout + # DELETE API_BASE/logout(.json) + pass + + +class LeapAuthRealm(object): + """ + The realm corresponds to an application domain and is in charge of avatars, + which are network-accessible business logic objects. + """ + + # TODO should be initialized with provider API objects. + + implements(portal.IRealm) + + def requestAvatar(self, avatarId, mind, *interfaces): + + if ILeapUserAvatar in interfaces: + # XXX how should we get the details for the requested avatar? + avatar = LeapUserAvatar() + return ILeapUserAvatar, avatar, avatar.logout + + raise NotImplementedError( + "This realm only supports the ILeapUserAvatar interface.") + + +if __name__ == '__main__': + + # from the browser-id implementation + #import sys + #def _done(res): + #print res + #reactor.stop() + #assertion = sys.argv[1] + #d = defer.Deferred() + #reactor.callLater(0, d.callback, "http://localhost:8081") + #d.addCallback(SRPCredentialsChecker) + #d.addCallback(lambda c: c.requestAvatarId(assertion)) + #d.addBoth(_done) + #reactor.run() + + # XXX move boilerplate to some bitmask-core template. + + leap_realm = LeapAuthRealm() + # XXX should pass a provider mapping to realm too? + + leap_portal = portal.Portal(leap_realm) + + # XXX should we add an offline credentials checker, that's able + # to unlock local soledad sqlcipher backend? + # XXX should pass a provider mapping to credentials checker too? + srp_checker = SRPCredentialsChecker() + leap_portal.registerChecker(srp_checker) + + # XXX tie this to some sample server... + reactor.listenTCP(8000, EchoFactory(leap_portal)) + reactor.run() diff --git a/src/leap/bitmask/bonafide/provider.py b/src/leap/bitmask/bonafide/provider.py new file mode 100644 index 0000000..b44dae2 --- /dev/null +++ b/src/leap/bitmask/bonafide/provider.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# provier.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +LEAP Provider API. +""" + +from copy import deepcopy +import re +from urlparse import urlparse + + +""" +Maximum API version number supported by bonafide +""" +MAX_API_VERSION = 1 + + +class _MetaActionDispatcher(type): + + """ + A metaclass that will create dispatcher methods dynamically for each + action made available by the LEAP provider API. + + The new methods will be created according to the values contained in an + `_actions` dictionary, with the following format:: + + {'action_name': (uri_template, method)} + + where `uri_template` is a string that will be formatted with an arbitrary + number of keyword arguments. + + Any class that uses this one as its metaclass needs to implement two + private methods:: + + _get_uri(self, action_name, **extra_params) + _get_method(self, action_name) + + Beware that currently they cannot be inherited from bases. + """ + + def __new__(meta, name, bases, dct): + + def _generate_action_funs(dct): + _get_uri = dct['_get_uri'] + _get_method = dct['_get_method'] + newdct = deepcopy(dct) + actions = dct['_actions'] + + def create_uri_fun(action_name): + return lambda self, **kw: _get_uri( + self, action_name=action_name, **kw) + + def create_met_fun(action_name): + return lambda self: _get_method( + self, action_name=action_name) + + for action in actions: + uri, method = actions[action] + _action_uri = 'get_%s_uri' % action + _action_met = 'get_%s_method' % action + newdct[_action_uri] = create_uri_fun(action) + newdct[_action_met] = create_met_fun(action) + return newdct + + newdct = _generate_action_funs(dct) + return super(_MetaActionDispatcher, meta).__new__( + meta, name, bases, newdct) + + +class BaseProvider(object): + + def __init__(self, netloc, version=1): + parsed = urlparse(netloc) + if parsed.scheme != 'https': + raise ValueError( + 'ProviderApi needs to be passed a url with https scheme') + self.netloc = parsed.netloc + + self.version = version + if version > MAX_API_VERSION: + self.version = MAX_API_VERSION + + def get_hostname(self): + return urlparse(self._get_base_url()).hostname + + def _get_base_url(self): + return "https://{0}/{1}".format(self.netloc, self.version) + + +class Api(BaseProvider): + """ + An object that has all the information that a client needs to communicate + with the remote methods exposed by the web API of a LEAP provider. + + The actions are described in https://leap.se/bonafide + + By using the _MetaActionDispatcher as a metaclass, the _actions dict will + be translated dynamically into a set of instance methods that will allow + getting the uri and method for each action. + + The keyword arguments specified in the format string will automatically + raise a KeyError if the needed keyword arguments are not passed to the + dynamically created methods. + """ + + # TODO when should the provider-api object be created? + # TODO pass a Provider object to constructor, with autoconf flag. + # TODO make the actions attribute depend on the api version + # TODO missing UPDATE USER RECORD + + __metaclass__ = _MetaActionDispatcher + _actions = { + 'signup': ('users', 'POST'), + 'update_user': ('users/{uid}', 'PUT'), + 'handshake': ('sessions', 'POST'), + 'authenticate': ('sessions/{login}', 'PUT'), + 'logout': ('logout', 'DELETE'), + 'vpn_cert': ('cert', 'POST'), + 'smtp_cert': ('smtp_cert', 'POST'), + } + + # Methods expected by the dispatcher metaclass + + def _get_uri(self, action_name, **extra_params): + resource, _ = self._actions.get(action_name) + uri = '{0}/{1}'.format( + bytes(self._get_base_url()), + bytes(resource)).format(**extra_params) + return uri + + def _get_method(self, action_name): + _, method = self._actions.get(action_name) + return method + + +class Discovery(BaseProvider): + """ + Discover basic information about a provider, including the provided + services. + """ + + __metaclass__ = _MetaActionDispatcher + _actions = { + 'provider_info': ('provider.json', 'GET'), + 'configs': ('1/configs.json', 'GET'), + } + + def _get_base_url(self): + return "https://{0}".format(self.netloc) + + def get_base_uri(self): + return self._get_base_url() + + # Methods expected by the dispatcher metaclass + + def _get_uri(self, action_name, **extra_params): + resource, _ = self._actions.get(action_name) + uri = '{0}/{1}'.format( + bytes(self._get_base_url()), + bytes(resource)).format(**extra_params) + return uri + + def _get_method(self, action_name): + _, method = self._actions.get(action_name) + return method + + +def validate_username(username): + accepted_characters = '^[a-z0-9\-\_\.]*$' + if not re.match(accepted_characters, username): + raise ValueError('Only lowercase letters, digits, . - and _ allowed.') diff --git a/src/leap/bitmask/bonafide/service.py b/src/leap/bitmask/bonafide/service.py new file mode 100644 index 0000000..14585ef --- /dev/null +++ b/src/leap/bitmask/bonafide/service.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# service.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Bonafide Service. +""" +import os +from collections import defaultdict + +from leap.common.config import get_path_prefix +from leap.common.service_hooks import HookableService +from leap.bonafide._protocol import BonafideProtocol + +from twisted.internet import defer +from twisted.python import log + + +_preffix = get_path_prefix() + + +class BonafideService(HookableService): + + def __init__(self, basedir=None): + if not basedir: + basedir = os.path.join(_preffix, 'leap') + self._basedir = os.path.expanduser(basedir) + self._bonafide = BonafideProtocol() + self.service_hooks = defaultdict(list) + + # XXX this is a quick hack to get a ref + # to the latest authenticated user. + self._active_user = None + + def startService(self): + log.msg('Starting Bonafide Service') + super(BonafideService, self).startService() + + # Commands + + def do_authenticate(self, username, password): + + def notify_passphrase_entry(username, password): + data = dict(username=username, password=password) + self.trigger_hook('on_passphrase_entry', **data) + + def notify_bonafide_auth(result): + if not result: + msg = "Authentication hook did not return anything" + log.msg(msg) + raise RuntimeError(msg) + + token, uuid = result + data = dict(username=username, token=token, uuid=uuid, + password=password) + self.trigger_hook('on_bonafide_auth', **data) + + self._active_user = username + return result + + # XXX I still have doubts from where it's best to trigger this. + # We probably should wait for BOTH deferreds and + # handle local and remote authentication success together + # (and fail if either one fails). Going with fire-and-forget for + # now, but needs needs improvement. + + notify_passphrase_entry(username, password) + + d = self._bonafide.do_authenticate(username, password) + d.addCallback(notify_bonafide_auth) + d.addCallback(lambda response: { + 'srp_token': response[0], 'uuid': response[1]}) + return d + + def do_signup(self, username, password): + d = self._bonafide.do_signup(username, password) + d.addCallback(lambda response: {'signup': 'ok', 'user': response}) + return d + + def do_logout(self, username): + if not username: + username = self._active_user + + def reset_active(passthrough): + self._active_user = None + return passthrough + + d = self._bonafide.do_logout(username) + d.addCallback(reset_active) + d.addCallback(lambda response: {'logout': 'ok'}) + return d + + def do_get_smtp_cert(self, username=None): + if not username: + username = self._active_user + if not username: + return defer.fail( + RuntimeError('No active user, cannot get SMTP cert.')) + + d = self._bonafide.do_get_smtp_cert(username) + d.addCallback(lambda response: (username, response)) + return d + + def do_get_active_user(self): + user = self._active_user or '<none>' + info = {'user': user} + return defer.succeed(info) diff --git a/src/leap/bitmask/bonafide/services/TODO b/src/leap/bitmask/bonafide/services/TODO new file mode 100644 index 0000000..97dc639 --- /dev/null +++ b/src/leap/bitmask/bonafide/services/TODO @@ -0,0 +1 @@ +# XXX this should be discoverable through the config. diff --git a/src/leap/bitmask/bonafide/services/__init__.py b/src/leap/bitmask/bonafide/services/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/services/__init__.py diff --git a/src/leap/bitmask/bonafide/services/eip.py b/src/leap/bitmask/bonafide/services/eip.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/services/eip.py diff --git a/src/leap/bitmask/bonafide/services/mail.py b/src/leap/bitmask/bonafide/services/mail.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/services/mail.py diff --git a/src/leap/bitmask/bonafide/services/soledad.py b/src/leap/bitmask/bonafide/services/soledad.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/services/soledad.py diff --git a/src/leap/bitmask/bonafide/session.py b/src/leap/bitmask/bonafide/session.py new file mode 100644 index 0000000..4180041 --- /dev/null +++ b/src/leap/bitmask/bonafide/session.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +# session.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +LEAP Session management. +""" +from twisted.internet import defer, reactor +from twisted.python import log + +from leap.bonafide import _srp +from leap.bonafide import provider +from leap.bonafide._http import httpRequest, cookieAgentFactory + +OK = 'ok' + + +def _auth_required(func): + """ + Decorate a method so that it will not be called if the instance + attribute `is_authenticated` does not evaluate to True. + """ + def decorated(*args, **kwargs): + instance = args[0] + allowed = getattr(instance, 'is_authenticated') + if not allowed: + raise RuntimeError('This method requires authentication') + return func(*args, **kwargs) + return decorated + + +class Session(object): + + def __init__(self, credentials, api, provider_cert): + # TODO check if an anonymous credentials is passed. + # TODO move provider_cert to api object. + # On creation, it should be able to retrieve all the info it needs + # (calling bootstrap). + # TODO could get a "provider" object instead. + # this provider can have an api attribute, + # and a "autoconfig" attribute passed on initialization. + # TODO get a file-descriptor for password if not in credentials + # TODO merge self._request with config.Provider._http_request ? + + self.username = credentials.username + self.password = credentials.password + self._provider_cert = provider_cert + self._api = api + self._initialize_session() + + def _initialize_session(self): + self._agent = cookieAgentFactory(self._provider_cert) + username = self.username or '' + password = self.password or '' + self._srp_auth = _srp.SRPAuthMechanism(username, password) + self._srp_signup = _srp.SRPSignupMechanism() + self._token = None + self._uuid = None + + # Session + + @property + def token(self): + return self._token + + @property + def uuid(self): + return self._uuid + + @property + def is_authenticated(self): + return self._srp_auth.srp_user.authenticated() + + @defer.inlineCallbacks + def authenticate(self): + uri = self._api.get_handshake_uri() + met = self._api.get_handshake_method() + log.msg("%s to %s" % (met, uri)) + params = self._srp_auth.get_handshake_params() + + handshake = yield self._request(self._agent, uri, values=params, + method=met) + + self._srp_auth.process_handshake(handshake) + uri = self._api.get_authenticate_uri(login=self.username) + met = self._api.get_authenticate_method() + + log.msg("%s to %s" % (met, uri)) + params = self._srp_auth.get_authentication_params() + + auth = yield self._request(self._agent, uri, values=params, + method=met) + + uuid, token = self._srp_auth.process_authentication(auth) + self._srp_auth.verify_authentication() + + self._uuid = uuid + self._token = token + defer.returnValue(OK) + + @_auth_required + @defer.inlineCallbacks + def logout(self): + uri = self._api.get_logout_uri() + met = self._api.get_logout_method() + auth = yield self._request(self._agent, uri, method=met) + print 'AUTH', auth + print 'resetting user/pass' + self.username = None + self.password = None + self._initialize_session() + defer.returnValue(OK) + + # User certificates + + def get_vpn_cert(self): + # TODO pass it to the provider object so that it can save it in the + # right path. + uri = self._api.get_vpn_cert_uri() + met = self._api.get_vpn_cert_method() + return self._request(self._agent, uri, method=met) + + @_auth_required + def get_smtp_cert(self): + # TODO pass it to the provider object so that it can save it in the + # right path. + uri = self._api.get_smtp_cert_uri() + met = self._api.get_smtp_cert_method() + print met, "to", uri + return self._request(self._agent, uri, method=met) + + def _request(self, *args, **kw): + kw['token'] = self._token + return httpRequest(*args, **kw) + + # User management + + @defer.inlineCallbacks + def signup(self, username, password): + # XXX should check that it_IS_NOT_authenticated + provider.validate_username(username) + uri = self._api.get_signup_uri() + met = self._api.get_signup_method() + params = self._srp_signup.get_signup_params( + username, password) + + signup = yield self._request(self._agent, uri, values=params, + method=met) + registered_user = self._srp_signup.process_signup(signup) + self.username = username + self.password = password + defer.returnValue((OK, registered_user)) + + @_auth_required + def update_user_record(self): + # FIXME to be implemented + pass + + # Authentication-protected configuration + + @defer.inlineCallbacks + def fetch_provider_configs(self, uri, path): + config = yield self._request(self._agent, uri) + with open(path, 'w') as cf: + cf.write(config) + defer.returnValue('ok') + + +if __name__ == "__main__": + import os + import sys + from twisted.cred.credentials import UsernamePassword + + if len(sys.argv) != 4: + print "Usage:", sys.argv[0], "provider", "username", "password" + sys.exit() + _provider, username, password = sys.argv[1], sys.argv[2], sys.argv[3] + api = provider.Api('https://api.%s:4430' % _provider) + credentials = UsernamePassword(username, password) + cdev_pem = os.path.expanduser( + '~/.config/leap/providers/%s/keys/ca/cacert.pem' % _provider) + session = Session(credentials, api, cdev_pem) + + def print_result(result): + print result + return result + + def cbShutDown(ignored): + reactor.stop() + + def auth_eb(failure): + print "[ERROR!]", failure.getErrorMessage() + log.err(failure) + + d = session.authenticate() + d.addCallback(print_result) + d.addErrback(auth_eb) + + d.addCallback(lambda _: session.get_smtp_cert()) + #d.addCallback(lambda _: session.get_vpn_cert()) + d.addCallback(print_result) + d.addErrback(auth_eb) + + d.addCallback(lambda _: session.logout()) + d.addErrback(auth_eb) + d.addBoth(cbShutDown) + reactor.run() diff --git a/src/leap/bitmask/bonafide/ssh_service.py b/src/leap/bitmask/bonafide/ssh_service.py new file mode 100644 index 0000000..3777b10 --- /dev/null +++ b/src/leap/bitmask/bonafide/ssh_service.py @@ -0,0 +1,2 @@ +# TODO expose bonafide protocol through ssh, accessible through a hidden +# service diff --git a/src/leap/bitmask/bonafide/tests/__init__.py b/src/leap/bitmask/bonafide/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/bitmask/bonafide/tests/__init__.py diff --git a/src/leap/bitmask/keymanager/__init__.py b/src/leap/bitmask/keymanager/__init__.py new file mode 100644 index 0000000..0b8a5b3 --- /dev/null +++ b/src/leap/bitmask/keymanager/__init__.py @@ -0,0 +1,842 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Key Manager is a Nicknym agent for LEAP client. +""" +# let's do a little sanity check to see if we're using the wrong gnupg +import fileinput +import os +import sys +import tempfile +import json +import urllib + +from leap.common import ca_bundle +from twisted.web import client +from twisted.web._responses import NOT_FOUND + +from ._version import get_versions + +try: + from gnupg.gnupg import GPGUtilities + assert(GPGUtilities) # pyflakes happy + from gnupg import __version__ as _gnupg_version + if '-' in _gnupg_version: + # avoid Parsing it as LegacyVersion, get just + # the release numbers: + _gnupg_version = _gnupg_version.split('-')[0] + from pkg_resources import parse_version + # We need to make sure that we're not colliding with the infamous + # python-gnupg + assert(parse_version(_gnupg_version) >= parse_version('1.4.0')) + +except (ImportError, AssertionError): + print "*******" + print "Ooops! It looks like there is a conflict in the installed version " + print "of gnupg." + print "GNUPG_VERSION:", _gnupg_version + print + print "Disclaimer: Ideally, we would need to work a patch and propose the " + print "merge to upstream. But until then do: " + print + print "% pip uninstall python-gnupg" + print "% pip install gnupg" + print "*******" + sys.exit(1) + +import logging + +from twisted.internet import defer +from urlparse import urlparse + +from leap.common.check import leap_assert +from leap.common.http import HTTPClient +from leap.common.events import emit_async, catalog +from leap.common.decorators import memoized_method + +from leap.keymanager.errors import ( + KeyNotFound, + KeyNotValidUpgrade, + InvalidSignature +) +from leap.keymanager.validation import ValidationLevels, can_upgrade +from leap.keymanager.openpgp import OpenPGPScheme + +__version__ = get_versions()['version'] +del get_versions + +logger = logging.getLogger(__name__) + + +# +# The Key Manager +# + +class KeyManager(object): + + # + # server's key storage constants + # + + OPENPGP_KEY = 'openpgp' + PUBKEY_KEY = "user[public_key]" + + def __init__(self, address, nickserver_uri, soledad, token=None, + ca_cert_path=None, api_uri=None, api_version=None, uid=None, + gpgbinary=None): + """ + Initialize a Key Manager for user's C{address} with provider's + nickserver reachable in C{nickserver_uri}. + + :param address: The email address of the user of this Key Manager. + :type address: str + :param nickserver_uri: The URI of the nickserver. + :type nickserver_uri: str + :param soledad: A Soledad instance for local storage of keys. + :type soledad: leap.soledad.Soledad + :param token: The token for interacting with the webapp API. + :type token: str + :param ca_cert_path: The path to the CA certificate. + :type ca_cert_path: str + :param api_uri: The URI of the webapp API. + :type api_uri: str + :param api_version: The version of the webapp API. + :type api_version: str + :param uid: The user's UID. + :type uid: str + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} + """ + self._address = address + self._nickserver_uri = nickserver_uri + self._soledad = soledad + self._token = token + self.ca_cert_path = ca_cert_path + self.api_uri = api_uri + self.api_version = api_version + self.uid = uid + self._openpgp = OpenPGPScheme(soledad, gpgbinary=gpgbinary) + self._combined_ca_bundle = self._create_combined_bundle_file() + self._async_client = HTTPClient(self._combined_ca_bundle) + self._async_client_pinned = HTTPClient(self._ca_cert_path) + + # + # destructor + # + + def __del__(self): + try: + created_tmp_combined_ca_bundle = self._combined_ca_bundle not in \ + [ca_bundle.where(), self._ca_cert_path] + if created_tmp_combined_ca_bundle: + os.remove(self._combined_ca_bundle) + except OSError: + pass + + # + # utilities + # + + def _create_combined_bundle_file(self): + leap_ca_bundle = ca_bundle.where() + + if self._ca_cert_path == leap_ca_bundle: + return self._ca_cert_path # don't merge file with itself + elif not self._ca_cert_path: + return leap_ca_bundle + + tmp_file = tempfile.NamedTemporaryFile(delete=False) + + with open(tmp_file.name, 'w') as fout: + fin = fileinput.input(files=(leap_ca_bundle, self._ca_cert_path)) + for line in fin: + fout.write(line) + fin.close() + + return tmp_file.name + + @defer.inlineCallbacks + def _get_key_from_nicknym(self, address): + """ + Send a GET request to C{uri} containing C{data}. + + :param address: The URI of the request. + :type address: str + + :return: A deferred that will be fired with GET content as json (dict) + :rtype: Deferred + """ + try: + uri = self._nickserver_uri + '?address=' + address + content = yield self._fetch_and_handle_404_from_nicknym(uri, address) + json_content = json.loads(content) + + except KeyNotFound: + raise + except IOError as e: + logger.warning("HTTP error retrieving key: %r" % (e,)) + logger.warning("%s" % (content,)) + raise KeyNotFound(e.message), None, sys.exc_info()[2] + except ValueError as v: + logger.warning("Invalid JSON data from key: %s" % (uri,)) + raise KeyNotFound(v.message + ' - ' + uri), None, sys.exc_info()[2] + + except Exception as e: + logger.warning("Error retrieving key: %r" % (e,)) + raise KeyNotFound(e.message), None, sys.exc_info()[2] + # Responses are now text/plain, although it's json anyway, but + # this will fail when it shouldn't + # leap_assert( + # res.headers['content-type'].startswith('application/json'), + # 'Content-type is not JSON.') + defer.returnValue(json_content) + + def _fetch_and_handle_404_from_nicknym(self, uri, address): + """ + Send a GET request to C{uri} containing C{data}. + + :param uri: The URI of the request. + :type uri: str + :param address: The email corresponding to the key. + :type address: str + + :return: A deferred that will be fired with GET content as json (dict) + :rtype: Deferred + """ + def check_404(response): + if response.code == NOT_FOUND: + message = '%s: %s key not found.' % (response.code, address) + logger.warning(message) + raise KeyNotFound(message), None, sys.exc_info()[2] + return response + + d = self._async_client_pinned.request(str(uri), 'GET', callback=check_404) + d.addCallback(client.readBody) + return d + + @defer.inlineCallbacks + def _get_with_combined_ca_bundle(self, uri, data=None): + """ + Send a GET request to C{uri} containing C{data}. + + Instead of using the ca_cert provided on construction time, this + version also uses the default certificates shipped with leap.common + + :param uri: The URI of the request. + :type uri: str + :param data: The body of the request. + :type data: dict, str or file + + :return: A deferred that will be fired with the GET response + :rtype: Deferred + """ + try: + content = yield self._async_client.request(str(uri), 'GET') + except Exception as e: + logger.warning("There was a problem fetching key: %s" % (e,)) + raise KeyNotFound(uri) + if not content: + raise KeyNotFound(uri) + defer.returnValue(content) + + @defer.inlineCallbacks + def _put(self, uri, data=None): + """ + Send a PUT request to C{uri} containing C{data}. + + The request will be sent using the configured CA certificate path to + verify the server certificate and the configured session id for + authentication. + + :param uri: The URI of the request. + :type uri: str + :param data: The body of the request. + :type data: dict, str or file + + :return: A deferred that will be fired when PUT request finishes + :rtype: Deferred + """ + leap_assert( + self._token is not None, + 'We need a token to interact with webapp!') + if type(data) == dict: + data = urllib.urlencode(data) + headers = {'Authorization': [str('Token token=%s' % self._token)]} + headers['Content-Type'] = ['application/x-www-form-urlencoded'] + try: + res = yield self._async_client_pinned.request(str(uri), 'PUT', + body=str(data), + headers=headers) + except Exception as e: + logger.warning("Error uploading key: %r" % (e,)) + raise e + if 'error' in res: + # FIXME: That's a workaround for 500, + # we need to implement a readBody to assert response code + logger.warning("Error uploading key: %r" % (res,)) + raise Exception(res) + + @memoized_method(invalidation=300) + @defer.inlineCallbacks + def _fetch_keys_from_server(self, address): + """ + Fetch keys bound to address from nickserver and insert them in + local database. + + :param address: The address bound to the keys. + :type address: str + + :return: A Deferred which fires when the key is in the storage, + or which fails with KeyNotFound if the key was not found on + nickserver. + :rtype: Deferred + + """ + # request keys from the nickserver + server_keys = yield self._get_key_from_nicknym(address) + + # insert keys in local database + if self.OPENPGP_KEY in server_keys: + # nicknym server is authoritative for its own domain, + # for other domains the key might come from key servers. + validation_level = ValidationLevels.Weak_Chain + _, domain = _split_email(address) + if (domain == _get_domain(self._nickserver_uri)): + validation_level = ValidationLevels.Provider_Trust + + yield self.put_raw_key( + server_keys['openpgp'], + address=address, + validation=validation_level) + + # + # key management + # + + def send_key(self): + """ + Send user's key to provider. + + Public key bound to user's is sent to provider, which will sign it and + replace any prior keys for the same address in its database. + + :return: A Deferred which fires when the key is sent, or which fails + with KeyNotFound if the key was not found in local database. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + def send(pubkey): + data = { + self.PUBKEY_KEY: pubkey.key_data + } + uri = "%s/%s/users/%s.json" % ( + self._api_uri, + self._api_version, + self._uid) + d = self._put(uri, data) + d.addCallback(lambda _: + emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS, + self._address)) + return d + + d = self.get_key( + self._address, private=False, fetch_remote=False) + d.addCallback(send) + return d + + def get_key(self, address, private=False, fetch_remote=True): + """ + Return a key bound to address. + + First, search for the key in local storage. If it is not available, + then try to fetch from nickserver. + + :param address: The address bound to the key. + :type address: str + :param private: Look for a private key instead of a public one? + :type private: bool + :param fetch_remote: If key not found in local storage try to fetch + from nickserver + :type fetch_remote: bool + + :return: A Deferred which fires with an EncryptionKey bound to address, + or which fails with KeyNotFound if no key was found neither + locally or in keyserver or fail with KeyVersionError if the + key has a format not supported by this version of KeyManager + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + logger.debug("getting key for %s" % (address,)) + + emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) + + def key_found(key): + emit_async(catalog.KEYMANAGER_KEY_FOUND, address) + return key + + def key_not_found(failure): + if not failure.check(KeyNotFound): + return failure + + emit_async(catalog.KEYMANAGER_KEY_NOT_FOUND, address) + + # we will only try to fetch a key from nickserver if fetch_remote + # is True and the key is not private. + if fetch_remote is False or private is True: + return failure + + emit_async(catalog.KEYMANAGER_LOOKING_FOR_KEY, address) + d = self._fetch_keys_from_server(address) + d.addCallback( + lambda _: self._openpgp.get_key(address, private=False)) + d.addCallback(key_found) + return d + + # return key if it exists in local database + d = self._openpgp.get_key(address, private=private) + d.addCallbacks(key_found, key_not_found) + return d + + def get_all_keys(self, private=False): + """ + Return all keys stored in local database. + + :param private: Include private keys + :type private: bool + + :return: A Deferred which fires with a list of all keys in local db. + :rtype: Deferred + """ + return self._openpgp.get_all_keys(private) + + def gen_key(self): + """ + Generate a key bound to the user's address. + + :return: A Deferred which fires with the generated EncryptionKey. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + def signal_finished(key): + emit_async( + catalog.KEYMANAGER_FINISHED_KEY_GENERATION, self._address) + return key + + emit_async(catalog.KEYMANAGER_STARTED_KEY_GENERATION, self._address) + + d = self._openpgp.gen_key(self._address) + d.addCallback(signal_finished) + return d + + # + # Setters/getters + # + + def _get_token(self): + return self._token + + def _set_token(self, token): + self._token = token + + token = property( + _get_token, _set_token, doc='The session token.') + + def _get_ca_cert_path(self): + return self._ca_cert_path + + def _set_ca_cert_path(self, ca_cert_path): + self._ca_cert_path = ca_cert_path + + ca_cert_path = property( + _get_ca_cert_path, _set_ca_cert_path, + doc='The path to the CA certificate.') + + def _get_api_uri(self): + return self._api_uri + + def _set_api_uri(self, api_uri): + self._api_uri = api_uri + + api_uri = property( + _get_api_uri, _set_api_uri, doc='The webapp API URI.') + + def _get_api_version(self): + return self._api_version + + def _set_api_version(self, api_version): + self._api_version = api_version + + api_version = property( + _get_api_version, _set_api_version, doc='The webapp API version.') + + def _get_uid(self): + return self._uid + + def _set_uid(self, uid): + self._uid = uid + + uid = property( + _get_uid, _set_uid, doc='The uid of the user.') + + # + # encrypt/decrypt and sign/verify API + # + + def encrypt(self, data, address, passphrase=None, sign=None, + cipher_algo='AES256', fetch_remote=True): + """ + Encrypt data with the public key bound to address and sign with with + the private key bound to sign address. + + :param data: The data to be encrypted. + :type data: str + :param address: The address to encrypt it for. + :type address: str + :param passphrase: The passphrase for the secret key used for the + signature. + :type passphrase: str + :param sign: The address to be used for signature. + :type sign: str + :param cipher_algo: The cipher algorithm to use. + :type cipher_algo: str + :param fetch_remote: If key is not found in local storage try to fetch + from nickserver + :type fetch_remote: bool + + :return: A Deferred which fires with the encrypted data as str, or + which fails with KeyNotFound if no keys were found neither + locally or in keyserver or fails with KeyVersionError if the + key format is not supported or fails with EncryptError if + failed encrypting for some reason. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + @defer.inlineCallbacks + def encrypt(keys): + pubkey, signkey = keys + encrypted = yield self._openpgp.encrypt( + data, pubkey, passphrase, sign=signkey, + cipher_algo=cipher_algo) + if not pubkey.encr_used: + pubkey.encr_used = True + yield self._openpgp.put_key(pubkey) + defer.returnValue(encrypted) + + dpub = self.get_key(address, private=False, + fetch_remote=fetch_remote) + dpriv = defer.succeed(None) + if sign is not None: + dpriv = self.get_key(sign, private=True) + d = defer.gatherResults([dpub, dpriv], consumeErrors=True) + d.addCallbacks(encrypt, self._extract_first_error) + return d + + def decrypt(self, data, address, passphrase=None, verify=None, + fetch_remote=True): + """ + Decrypt data using private key from address and verify with public key + bound to verify address. + + :param data: The data to be decrypted. + :type data: str + :param address: The address to whom data was encrypted. + :type address: str + :param passphrase: The passphrase for the secret key used for + decryption. + :type passphrase: str + :param verify: The address to be used for signature. + :type verify: str + :param fetch_remote: If key for verify not found in local storage try + to fetch from nickserver + :type fetch_remote: bool + + :return: A Deferred which fires with: + * (decripted str, signing key) if validation works + * (decripted str, KeyNotFound) if signing key not found + * (decripted str, InvalidSignature) if signature is invalid + * KeyNotFound failure if private key not found + * DecryptError failure if decription failed + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + @defer.inlineCallbacks + def decrypt(keys): + pubkey, privkey = keys + decrypted, signed = yield self._openpgp.decrypt( + data, privkey, passphrase=passphrase, verify=pubkey) + if pubkey is None: + signature = KeyNotFound(verify) + elif signed: + signature = pubkey + if not pubkey.sign_used: + pubkey.sign_used = True + yield self._openpgp.put_key(pubkey) + defer.returnValue((decrypted, signature)) + else: + signature = InvalidSignature( + 'Failed to verify signature with key %s' % + (pubkey.fingerprint,)) + defer.returnValue((decrypted, signature)) + + dpriv = self.get_key(address, private=True) + dpub = defer.succeed(None) + if verify is not None: + dpub = self.get_key(verify, private=False, + fetch_remote=fetch_remote) + dpub.addErrback(lambda f: None if f.check(KeyNotFound) else f) + d = defer.gatherResults([dpub, dpriv], consumeErrors=True) + d.addCallbacks(decrypt, self._extract_first_error) + return d + + def _extract_first_error(self, failure): + return failure.value.subFailure + + def sign(self, data, address, digest_algo='SHA512', clearsign=False, + detach=True, binary=False): + """ + Sign data with private key bound to address. + + :param data: The data to be signed. + :type data: str + :param address: The address to be used to sign. + :type address: EncryptionKey + :param digest_algo: The hash digest to use. + :type digest_algo: str + :param clearsign: If True, create a cleartext signature. + :type clearsign: bool + :param detach: If True, create a detached signature. + :type detach: bool + :param binary: If True, do not ascii armour the output. + :type binary: bool + + :return: A Deferred which fires with the signed data as str or fails + with KeyNotFound if no key was found neither locally or in + keyserver or fails with SignFailed if there was any error + signing. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + def sign(privkey): + return self._openpgp.sign( + data, privkey, digest_algo=digest_algo, clearsign=clearsign, + detach=detach, binary=binary) + + d = self.get_key(address, private=True) + d.addCallback(sign) + return d + + def verify(self, data, address, detached_sig=None, + fetch_remote=True): + """ + Verify signed data with private key bound to address, eventually using + detached_sig. + + :param data: The data to be verified. + :type data: str + :param address: The address to be used to verify. + :type address: EncryptionKey + :param detached_sig: A detached signature. If given, C{data} is + verified using this detached signature. + :type detached_sig: str + :param fetch_remote: If key for verify not found in local storage try + to fetch from nickserver + :type fetch_remote: bool + + :return: A Deferred which fires with the signing EncryptionKey if + signature verifies, or which fails with InvalidSignature if + signature don't verifies or fails with KeyNotFound if no key + was found neither locally or in keyserver. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + def verify(pubkey): + signed = self._openpgp.verify( + data, pubkey, detached_sig=detached_sig) + if signed: + if not pubkey.sign_used: + pubkey.sign_used = True + d = self._openpgp.put_key(pubkey) + d.addCallback(lambda _: pubkey) + return d + return pubkey + else: + raise InvalidSignature( + 'Failed to verify signature with key %s' % + (pubkey.fingerprint,)) + + d = self.get_key(address, private=False, + fetch_remote=fetch_remote) + d.addCallback(verify) + return d + + def delete_key(self, key): + """ + Remove key from storage. + + :param key: The key to be removed. + :type key: EncryptionKey + + :return: A Deferred which fires when the key is deleted, or which fails + KeyNotFound if the key was not found on local storage. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + return self._openpgp.delete_key(key) + + def put_key(self, key): + """ + Put key bound to address in local storage. + + :param key: The key to be stored + :type key: EncryptionKey + + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyNotValidUpdate if a key with the same + uid exists and the new one is not a valid update for it. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + def old_key_not_found(failure): + if failure.check(KeyNotFound): + return None + else: + return failure + + def check_upgrade(old_key): + if key.private or can_upgrade(key, old_key): + return self._openpgp.put_key(key) + else: + raise KeyNotValidUpgrade( + "Key %s can not be upgraded by new key %s" + % (old_key.fingerprint, key.fingerprint)) + + d = self._openpgp.get_key(key.address, private=key.private) + d.addErrback(old_key_not_found) + d.addCallback(check_upgrade) + return d + + def put_raw_key(self, key, address, + validation=ValidationLevels.Weak_Chain): + """ + Put raw key bound to address in local storage. + + :param key: The ascii key to be stored + :type key: str + :param address: address for which this key will be active + :type address: str + :param validation: validation level for this key + (default: 'Weak_Chain') + :type validation: ValidationLevels + + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyAddressMismatch if address doesn't match + any uid on the key or fails with KeyNotFound if no OpenPGP + material was found in key or fails with KeyNotValidUpdate if a + key with the same uid exists and the new one is not a valid + update for it. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + pubkey, privkey = self._openpgp.parse_key(key, address) + + if pubkey is None: + return defer.fail(KeyNotFound(key)) + + pubkey.validation = validation + d = self.put_key(pubkey) + if privkey is not None: + d.addCallback(lambda _: self.put_key(privkey)) + return d + + @defer.inlineCallbacks + def fetch_key(self, address, uri, validation=ValidationLevels.Weak_Chain): + """ + Fetch a public key bound to address from the network and put it in + local storage. + + :param address: The email address of the key. + :type address: str + :param uri: The URI of the key. + :type uri: str + :param validation: validation level for this key + (default: 'Weak_Chain') + :type validation: ValidationLevels + + :return: A Deferred which fires when the key is in the storage, or + which fails with KeyNotFound: if not valid key on uri or fails + with KeyAddressMismatch if address doesn't match any uid on + the key or fails with KeyNotValidUpdate if a key with the same + uid exists and the new one is not a valid update for it. + :rtype: Deferred + + :raise UnsupportedKeyTypeError: if invalid key type + """ + + logger.info("Fetch key for %s from %s" % (address, uri)) + ascii_content = yield self._get_with_combined_ca_bundle(uri) + + # XXX parse binary keys + pubkey, _ = self._openpgp.parse_key(ascii_content, address) + if pubkey is None: + raise KeyNotFound(uri) + + pubkey.validation = validation + yield self.put_key(pubkey) + + +def _split_email(address): + """ + Split username and domain from an email address + + :param address: an email address + :type address: str + + :return: username and domain from the email address + :rtype: (str, str) + """ + if address.count("@") != 1: + return None + return address.split("@") + + +def _get_domain(url): + """ + Get the domain from an url + + :param url: an url + :type url: str + + :return: the domain part of the url + :rtype: str + """ + return urlparse(url).hostname diff --git a/src/leap/bitmask/keymanager/_version.py b/src/leap/bitmask/keymanager/_version.py new file mode 100644 index 0000000..b28c697 --- /dev/null +++ b/src/leap/bitmask/keymanager/_version.py @@ -0,0 +1,484 @@ + +# This file helps to compute a version number in source trees obtained from +# git-archive tarball (such as those provided by githubs download-from-tag +# feature). Distribution tarballs (built by setup.py sdist) and build +# directories (produced by setup.py build) will contain a much shorter file +# that just contains the computed version number. + +# This file is released into the public domain. Generated by +# versioneer-0.16 (https://github.com/warner/python-versioneer) + +"""Git implementation of _version.py.""" + +import errno +import os +import re +import subprocess +import sys + + +def get_keywords(): + """Get the keywords needed to look up the version information.""" + # these strings will be replaced by git during git-archive. + # setup.py/versioneer.py will grep for the variable names, so they must + # each be defined on a line of their own. _version.py will just call + # get_keywords(). + git_refnames = "$Format:%d$" + git_full = "$Format:%H$" + keywords = {"refnames": git_refnames, "full": git_full} + return keywords + + +class VersioneerConfig: + """Container for Versioneer configuration parameters.""" + + +def get_config(): + """Create, populate and return the VersioneerConfig() object.""" + # these strings are filled in when 'setup.py versioneer' creates + # _version.py + cfg = VersioneerConfig() + cfg.VCS = "git" + cfg.style = "pep440" + cfg.tag_prefix = "" + cfg.parentdir_prefix = "None" + cfg.versionfile_source = "src/leap/keymanager/_version.py" + cfg.verbose = False + return cfg + + +class NotThisMethod(Exception): + """Exception raised if a method is not valid for the current scenario.""" + + +LONG_VERSION_PY = {} +HANDLERS = {} + + +def register_vcs_handler(vcs, method): # decorator + """Decorator to mark a method as the handler for a particular VCS.""" + def decorate(f): + """Store f in HANDLERS[vcs][method].""" + if vcs not in HANDLERS: + HANDLERS[vcs] = {} + HANDLERS[vcs][method] = f + return f + return decorate + + +def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False): + """Call the given command(s).""" + assert isinstance(commands, list) + p = None + for c in commands: + try: + dispcmd = str([c] + args) + # remember shell=False, so use git.cmd on windows, not just git + p = subprocess.Popen([c] + args, cwd=cwd, stdout=subprocess.PIPE, + stderr=(subprocess.PIPE if hide_stderr + else None)) + break + except EnvironmentError: + e = sys.exc_info()[1] + if e.errno == errno.ENOENT: + continue + if verbose: + print("unable to run %s" % dispcmd) + print(e) + return None + else: + if verbose: + print("unable to find command, tried %s" % (commands,)) + return None + stdout = p.communicate()[0].strip() + if sys.version_info[0] >= 3: + stdout = stdout.decode() + if p.returncode != 0: + if verbose: + print("unable to run %s (error)" % dispcmd) + return None + return stdout + + +def versions_from_parentdir(parentdir_prefix, root, verbose): + """Try to determine the version from the parent directory name. + + Source tarballs conventionally unpack into a directory that includes + both the project name and a version string. + """ + dirname = os.path.basename(root) + if not dirname.startswith(parentdir_prefix): + if verbose: + print("guessing rootdir is '%s', but '%s' doesn't start with " + "prefix '%s'" % (root, dirname, parentdir_prefix)) + raise NotThisMethod("rootdir doesn't start with parentdir_prefix") + return {"version": dirname[len(parentdir_prefix):], + "full-revisionid": None, + "dirty": False, "error": None} + + +@register_vcs_handler("git", "get_keywords") +def git_get_keywords(versionfile_abs): + """Extract version information from the given file.""" + # the code embedded in _version.py can just fetch the value of these + # keywords. When used from setup.py, we don't want to import _version.py, + # so we do it with a regexp instead. This function is not used from + # _version.py. + keywords = {} + try: + f = open(versionfile_abs, "r") + for line in f.readlines(): + if line.strip().startswith("git_refnames ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["refnames"] = mo.group(1) + if line.strip().startswith("git_full ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["full"] = mo.group(1) + f.close() + except EnvironmentError: + pass + return keywords + + +@register_vcs_handler("git", "keywords") +def git_versions_from_keywords(keywords, tag_prefix, verbose): + """Get version information from git keywords.""" + if not keywords: + raise NotThisMethod("no keywords at all, weird") + refnames = keywords["refnames"].strip() + if refnames.startswith("$Format"): + if verbose: + print("keywords are unexpanded, not using") + raise NotThisMethod("unexpanded keywords, not a git-archive tarball") + refs = set([r.strip() for r in refnames.strip("()").split(",")]) + # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of + # just "foo-1.0". If we see a "tag: " prefix, prefer those. + TAG = "tag: " + tags = set([r[len(TAG):] for r in refs if r.startswith(TAG)]) + if not tags: + # Either we're using git < 1.8.3, or there really are no tags. We use + # a heuristic: assume all version tags have a digit. The old git %d + # expansion behaves like git log --decorate=short and strips out the + # refs/heads/ and refs/tags/ prefixes that would let us distinguish + # between branches and tags. By ignoring refnames without digits, we + # filter out many common branch names like "release" and + # "stabilization", as well as "HEAD" and "master". + tags = set([r for r in refs if re.search(r'\d', r)]) + if verbose: + print("discarding '%s', no digits" % ",".join(refs-tags)) + if verbose: + print("likely tags: %s" % ",".join(sorted(tags))) + for ref in sorted(tags): + # sorting will prefer e.g. "2.0" over "2.0rc1" + if ref.startswith(tag_prefix): + r = ref[len(tag_prefix):] + if verbose: + print("picking %s" % r) + return {"version": r, + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": None + } + # no suitable tags, so version is "0+unknown", but full hex is still there + if verbose: + print("no suitable tags, using unknown + full revision id") + return {"version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": "no suitable tags"} + + +@register_vcs_handler("git", "pieces_from_vcs") +def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): + """Get version from 'git describe' in the root of the source tree. + + This only gets called if the git-archive 'subst' keywords were *not* + expanded, and _version.py hasn't already been rewritten with a short + version string, meaning we're inside a checked out source tree. + """ + if not os.path.exists(os.path.join(root, ".git")): + if verbose: + print("no .git in %s" % root) + raise NotThisMethod("no .git directory") + + GITS = ["git"] + if sys.platform == "win32": + GITS = ["git.cmd", "git.exe"] + # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty] + # if there isn't one, this yields HEX[-dirty] (no NUM) + describe_out = run_command(GITS, ["describe", "--tags", "--dirty", + "--always", "--long", + "--match", "%s*" % tag_prefix], + cwd=root) + # --long was added in git-1.5.5 + if describe_out is None: + raise NotThisMethod("'git describe' failed") + describe_out = describe_out.strip() + full_out = run_command(GITS, ["rev-parse", "HEAD"], cwd=root) + if full_out is None: + raise NotThisMethod("'git rev-parse' failed") + full_out = full_out.strip() + + pieces = {} + pieces["long"] = full_out + pieces["short"] = full_out[:7] # maybe improved later + pieces["error"] = None + + # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty] + # TAG might have hyphens. + git_describe = describe_out + + # look for -dirty suffix + dirty = git_describe.endswith("-dirty") + pieces["dirty"] = dirty + if dirty: + git_describe = git_describe[:git_describe.rindex("-dirty")] + + # now we have TAG-NUM-gHEX or HEX + + if "-" in git_describe: + # TAG-NUM-gHEX + mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe) + if not mo: + # unparseable. Maybe git-describe is misbehaving? + pieces["error"] = ("unable to parse git-describe output: '%s'" + % describe_out) + return pieces + + # tag + full_tag = mo.group(1) + if not full_tag.startswith(tag_prefix): + if verbose: + fmt = "tag '%s' doesn't start with prefix '%s'" + print(fmt % (full_tag, tag_prefix)) + pieces["error"] = ("tag '%s' doesn't start with prefix '%s'" + % (full_tag, tag_prefix)) + return pieces + pieces["closest-tag"] = full_tag[len(tag_prefix):] + + # distance: number of commits since tag + pieces["distance"] = int(mo.group(2)) + + # commit: short hex revision ID + pieces["short"] = mo.group(3) + + else: + # HEX: no tags + pieces["closest-tag"] = None + count_out = run_command(GITS, ["rev-list", "HEAD", "--count"], + cwd=root) + pieces["distance"] = int(count_out) # total number of commits + + return pieces + + +def plus_or_dot(pieces): + """Return a + if we don't already have one, else return a .""" + if "+" in pieces.get("closest-tag", ""): + return "." + return "+" + + +def render_pep440(pieces): + """Build up version string, with post-release "local version identifier". + + Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you + get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty + + Exceptions: + 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += plus_or_dot(pieces) + rendered += "%d.g%s" % (pieces["distance"], pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0+untagged.%d.g%s" % (pieces["distance"], + pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def render_pep440_pre(pieces): + """TAG[.post.devDISTANCE] -- No -dirty. + + Exceptions: + 1: no tags. 0.post.devDISTANCE + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += ".post.dev%d" % pieces["distance"] + else: + # exception #1 + rendered = "0.post.dev%d" % pieces["distance"] + return rendered + + +def render_pep440_post(pieces): + """TAG[.postDISTANCE[.dev0]+gHEX] . + + The ".dev0" means dirty. Note that .dev0 sorts backwards + (a dirty tree will appear "older" than the corresponding clean one), + but you shouldn't be releasing software with -dirty anyways. + + Exceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "g%s" % pieces["short"] + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += "+g%s" % pieces["short"] + return rendered + + +def render_pep440_old(pieces): + """TAG[.postDISTANCE[.dev0]] . + + The ".dev0" means dirty. + + Eexceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + return rendered + + +def render_git_describe(pieces): + """TAG[-DISTANCE-gHEX][-dirty]. + + Like 'git describe --tags --dirty --always'. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render_git_describe_long(pieces): + """TAG-DISTANCE-gHEX[-dirty]. + + Like 'git describe --tags --dirty --always -long'. + The distance/hash is unconditional. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render(pieces, style): + """Render the given version pieces into the requested style.""" + if pieces["error"]: + return {"version": "unknown", + "full-revisionid": pieces.get("long"), + "dirty": None, + "error": pieces["error"]} + + if not style or style == "default": + style = "pep440" # the default + + if style == "pep440": + rendered = render_pep440(pieces) + elif style == "pep440-pre": + rendered = render_pep440_pre(pieces) + elif style == "pep440-post": + rendered = render_pep440_post(pieces) + elif style == "pep440-old": + rendered = render_pep440_old(pieces) + elif style == "git-describe": + rendered = render_git_describe(pieces) + elif style == "git-describe-long": + rendered = render_git_describe_long(pieces) + else: + raise ValueError("unknown style '%s'" % style) + + return {"version": rendered, "full-revisionid": pieces["long"], + "dirty": pieces["dirty"], "error": None} + + +def get_versions(): + """Get version information or return default if unable to do so.""" + # I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have + # __file__, we can work backwards from there to the root. Some + # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which + # case we can only use expanded keywords. + + cfg = get_config() + verbose = cfg.verbose + + try: + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, + verbose) + except NotThisMethod: + pass + + try: + root = os.path.realpath(__file__) + # versionfile_source is the relative path from the top of the source + # tree (where the .git directory might live) to this file. Invert + # this to find the root from __file__. + for i in cfg.versionfile_source.split('/'): + root = os.path.dirname(root) + except NameError: + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to find root of source tree"} + + try: + pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose) + return render(pieces, cfg.style) + except NotThisMethod: + pass + + try: + if cfg.parentdir_prefix: + return versions_from_parentdir(cfg.parentdir_prefix, root, verbose) + except NotThisMethod: + pass + + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to compute version"} diff --git a/src/leap/bitmask/keymanager/documents.py b/src/leap/bitmask/keymanager/documents.py new file mode 100644 index 0000000..2ed5376 --- /dev/null +++ b/src/leap/bitmask/keymanager/documents.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# documents.py +# Copyright (C) 2013-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Soledad documents +""" +from twisted.internet import defer +from leap.common.check import leap_assert + +# +# Dictionary keys used for storing cryptographic keys. +# + +KEY_VERSION_KEY = 'version' +KEY_UIDS_KEY = 'uids' +KEY_ADDRESS_KEY = 'address' +KEY_TYPE_KEY = 'type' +KEY_FINGERPRINT_KEY = 'fingerprint' +KEY_DATA_KEY = 'key_data' +KEY_PRIVATE_KEY = 'private' +KEY_LENGTH_KEY = 'length' +KEY_EXPIRY_DATE_KEY = 'expiry_date' +KEY_LAST_AUDITED_AT_KEY = 'last_audited_at' +KEY_REFRESHED_AT_KEY = 'refreshed_at' +KEY_VALIDATION_KEY = 'validation' +KEY_ENCR_USED_KEY = 'encr_used' +KEY_SIGN_USED_KEY = 'sign_used' +KEY_TAGS_KEY = 'tags' + + +# +# Key storage constants +# + +KEYMANAGER_KEY_TAG = 'keymanager-key' +KEYMANAGER_ACTIVE_TAG = 'keymanager-active' +KEYMANAGER_ACTIVE_TYPE = '-active' + +# Version of the Soledad Document schema, +# it should be bumped each time the document format changes +KEYMANAGER_DOC_VERSION = 1 + + +# +# key indexing constants. +# + +TAGS_PRIVATE_INDEX = 'by-tags-private' +TYPE_FINGERPRINT_PRIVATE_INDEX = 'by-type-fingerprint-private' +TYPE_ADDRESS_PRIVATE_INDEX = 'by-type-address-private' +INDEXES = { + TAGS_PRIVATE_INDEX: [ + KEY_TAGS_KEY, + 'bool(%s)' % KEY_PRIVATE_KEY, + ], + TYPE_FINGERPRINT_PRIVATE_INDEX: [ + KEY_TYPE_KEY, + KEY_FINGERPRINT_KEY, + 'bool(%s)' % KEY_PRIVATE_KEY, + ], + TYPE_ADDRESS_PRIVATE_INDEX: [ + KEY_TYPE_KEY, + KEY_ADDRESS_KEY, + 'bool(%s)' % KEY_PRIVATE_KEY, + ] +} + + +@defer.inlineCallbacks +def init_indexes(soledad): + """ + Initialize the database indexes. + """ + leap_assert(soledad is not None, + "Cannot init indexes with null soledad") + + indexes = yield soledad.list_indexes() + db_indexes = dict(indexes) + # Loop through the indexes we expect to find. + for name, expression in INDEXES.items(): + if name not in db_indexes: + # The index does not yet exist. + yield soledad.create_index(name, *expression) + elif expression != db_indexes[name]: + # The index exists but the definition is not what expected, + # so we delete it and add the proper index expression. + yield soledad.delete_index(name) + yield soledad.create_index(name, *expression) diff --git a/src/leap/bitmask/keymanager/errors.py b/src/leap/bitmask/keymanager/errors.py new file mode 100644 index 0000000..dfff393 --- /dev/null +++ b/src/leap/bitmask/keymanager/errors.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# errors.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Errors and exceptions used by the Key Manager. +""" + + +class KeyNotFound(Exception): + """ + Raised when key was no found on keyserver. + """ + pass + + +class KeyVersionError(KeyNotFound): + """ + Raised when key was found in the keyring but the version is not supported. + + It will usually mean that it was created by a newer version of KeyManager. + """ + pass + + +class KeyAlreadyExists(Exception): + """ + Raised when attempted to create a key that already exists. + """ + pass + + +class KeyAttributesDiffer(Exception): + """ + Raised when trying to delete a key but the stored key differs from the key + passed to the delete_key() method. + """ + pass + + +class NoPasswordGiven(Exception): + """ + Raised when trying to perform some action that needs a password without + providing one. + """ + pass + + +class InvalidSignature(Exception): + """ + Raised when signature could not be verified. + """ + pass + + +class EncryptError(Exception): + """ + Raised upon failures of encryption. + """ + pass + + +class DecryptError(Exception): + """ + Raised upon failures of decryption. + """ + pass + + +class GPGError(Exception): + """ + Raised upon failures of encryption/decryption. + """ + pass + + +class SignFailed(Exception): + """ + Raised when failed to sign. + """ + pass + + +class KeyAddressMismatch(Exception): + """ + A mismatch between addresses. + """ + + +class KeyFingerprintMismatch(Exception): + """ + A mismatch between fingerprints. + """ + + +class KeyNotValidUpgrade(Exception): + """ + Already existing key can not be upgraded with the new key + """ + + +class UnsupportedKeyTypeError(Exception): + """ + Invalid key type + """ diff --git a/src/leap/bitmask/keymanager/keys.py b/src/leap/bitmask/keymanager/keys.py new file mode 100644 index 0000000..91ecf3a --- /dev/null +++ b/src/leap/bitmask/keymanager/keys.py @@ -0,0 +1,290 @@ +# -*- coding: utf-8 -*- +# keys.py +# Copyright (C) 2013-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Abstact key type and encryption scheme representations. +""" + + +import json +import logging +import re +import time + +from datetime import datetime + +from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper +from leap.keymanager.validation import ValidationLevels +from leap.keymanager import documents as doc + +logger = logging.getLogger(__name__) + + +# +# Key handling utilities +# + +def is_address(address): + """ + Return whether the given C{address} is in the form user@provider. + + :param address: The address to be tested. + :type address: str + :return: Whether C{address} is in the form user@provider. + :rtype: bool + """ + return bool(re.match('[\w.-]+@[\w.-]+', address)) + + +def build_key_from_dict(key, active=None): + """ + Build an OpenPGPKey key based on info in C{kdict}. + + :param key: Dictionary with key data. + :type key: dict + :param active: Dictionary with active data. + :type active: dict + :return: An instance of the key. + :rtype: C{kClass} + """ + address = None + validation = ValidationLevels.Weak_Chain + last_audited_at = None + encr_used = False + sign_used = False + + if active: + address = active[doc.KEY_ADDRESS_KEY] + try: + validation = ValidationLevels.get(active[doc.KEY_VALIDATION_KEY]) + except ValueError: + logger.error("Not valid validation level (%s) for key %s", + (active[doc.KEY_VALIDATION_KEY], + active[doc.KEY_FINGERPRINT_KEY])) + last_audited_at = _to_datetime(active[doc.KEY_LAST_AUDITED_AT_KEY]) + encr_used = active[doc.KEY_ENCR_USED_KEY] + sign_used = active[doc.KEY_SIGN_USED_KEY] + + expiry_date = _to_datetime(key[doc.KEY_EXPIRY_DATE_KEY]) + refreshed_at = _to_datetime(key[doc.KEY_REFRESHED_AT_KEY]) + + return OpenPGPKey( + address=address, + uids=key[doc.KEY_UIDS_KEY], + fingerprint=key[doc.KEY_FINGERPRINT_KEY], + key_data=key[doc.KEY_DATA_KEY], + private=key[doc.KEY_PRIVATE_KEY], + length=key[doc.KEY_LENGTH_KEY], + expiry_date=expiry_date, + last_audited_at=last_audited_at, + refreshed_at=refreshed_at, + validation=validation, + encr_used=encr_used, + sign_used=sign_used, + ) + + +def _to_datetime(unix_time): + if unix_time != 0: + return datetime.fromtimestamp(unix_time) + else: + return None + + +def _to_unix_time(date): + if date is not None: + return int(time.mktime(date.timetuple())) + else: + return 0 + + +class OpenPGPKey(object): + """ + Base class for OpenPGP keys. + """ + + __slots__ = ('address', 'uids', 'fingerprint', 'key_data', + 'private', 'length', 'expiry_date', 'validation', + 'last_audited_at', 'refreshed_at', + 'encr_used', 'sign_used', '_index', '_gpgbinary') + + def __init__(self, address=None, gpgbinary=None, uids=[], fingerprint="", + key_data="", private=False, length=0, expiry_date=None, + validation=ValidationLevels.Weak_Chain, last_audited_at=None, + refreshed_at=None, encr_used=False, sign_used=False): + self._gpgbinary = gpgbinary + self.address = address + if not uids and address: + self.uids = [address] + else: + self.uids = uids + self.fingerprint = fingerprint + self.key_data = key_data + self.private = private + self.length = length + self.expiry_date = expiry_date + + self.validation = validation + self.last_audited_at = last_audited_at + self.refreshed_at = refreshed_at + self.encr_used = encr_used + self.sign_used = sign_used + self._index = len(self.__slots__) + + @property + def signatures(self): + """ + Get the key signatures + + :return: the key IDs that have signed the key + :rtype: list(str) + """ + with TempGPGWrapper(keys=[self], gpgbinary=self._gpgbinary) as gpg: + res = gpg.list_sigs(self.fingerprint) + for uid, sigs in res.sigs.iteritems(): + if parse_address(uid) in self.uids: + return sigs + + return [] + + def merge(self, newkey): + if newkey.fingerprint != self.fingerprint: + logger.critical( + "Can't put a key whith the same key_id and different " + "fingerprint: %s, %s" + % (newkey.fingerprint, self.fingerprint)) + raise errors.KeyFingerprintMismatch(newkey.fingerprint) + + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: + gpg.import_keys(self.key_data) + gpg.import_keys(newkey.key_data) + gpgkey = gpg.list_keys(secret=newkey.private).pop() + + if gpgkey['expires']: + self.expiry_date = datetime.fromtimestamp( + int(gpgkey['expires'])) + else: + self.expiry_date = None + + self.uids = [] + for uid in gpgkey['uids']: + self.uids.append(parse_address(uid)) + + self.length = int(gpgkey['length']) + self.key_data = gpg.export_keys(gpgkey['fingerprint'], + secret=self.private) + + if newkey.validation > self.validation: + self.validation = newkey.validation + if newkey.last_audited_at > self.last_audited_at: + self.validation = newkey.last_audited_at + self.encr_used = newkey.encr_used or self.encr_used + self.sign_used = newkey.sign_used or self.sign_used + self.refreshed_at = datetime.now() + + def get_json(self): + """ + Return a JSON string describing this key. + + :return: The JSON string describing this key. + :rtype: str + """ + expiry_date = _to_unix_time(self.expiry_date) + refreshed_at = _to_unix_time(self.refreshed_at) + + return json.dumps({ + doc.KEY_UIDS_KEY: self.uids, + doc.KEY_TYPE_KEY: self.__class__.__name__, + doc.KEY_FINGERPRINT_KEY: self.fingerprint, + doc.KEY_DATA_KEY: self.key_data, + doc.KEY_PRIVATE_KEY: self.private, + doc.KEY_LENGTH_KEY: self.length, + doc.KEY_EXPIRY_DATE_KEY: expiry_date, + doc.KEY_REFRESHED_AT_KEY: refreshed_at, + doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION, + doc.KEY_TAGS_KEY: [doc.KEYMANAGER_KEY_TAG], + }) + + def get_active_json(self): + """ + Return a JSON string describing this key. + + :return: The JSON string describing this key. + :rtype: str + """ + last_audited_at = _to_unix_time(self.last_audited_at) + + return json.dumps({ + doc.KEY_ADDRESS_KEY: self.address, + doc.KEY_TYPE_KEY: (self.__class__.__name__ + + doc.KEYMANAGER_ACTIVE_TYPE), + doc.KEY_FINGERPRINT_KEY: self.fingerprint, + doc.KEY_PRIVATE_KEY: self.private, + doc.KEY_VALIDATION_KEY: str(self.validation), + doc.KEY_LAST_AUDITED_AT_KEY: last_audited_at, + doc.KEY_ENCR_USED_KEY: self.encr_used, + doc.KEY_SIGN_USED_KEY: self.sign_used, + doc.KEY_VERSION_KEY: doc.KEYMANAGER_DOC_VERSION, + doc.KEY_TAGS_KEY: [doc.KEYMANAGER_ACTIVE_TAG], + }) + + def next(self): + if self._index == 0: + self._index = len(self.__slots__) + raise StopIteration + + self._index -= 1 + key = self.__slots__[self._index] + + if key.startswith('_'): + return self.next() + + value = getattr(self, key) + if key == "validation": + value = str(value) + elif key in ["expiry_date", "last_audited_at", "refreshed_at"]: + value = str(value) + return key, value + + def __iter__(self): + return self + + def __repr__(self): + """ + Representation of this class + """ + return u"<%s 0x%s (%s - %s)>" % ( + self.__class__.__name__, + self.fingerprint, + self.address, + "priv" if self.private else "publ") + + +def parse_address(address): + """ + Remove name, '<', '>' and the identity suffix after the '+' until the '@' + e.g.: test_user+something@provider.com becomes test_user@provider.com + since the key belongs to the identity without the '+' suffix. + + :type address: str + :rtype: str + """ + mail_regex = '(.*<)?([\w.-]+)(\+.*)?(@[\w.-]+)(>.*)?' + match = re.match(mail_regex, address) + if match is None: + return None + return ''.join(match.group(2, 4)) diff --git a/src/leap/bitmask/keymanager/migrator.py b/src/leap/bitmask/keymanager/migrator.py new file mode 100644 index 0000000..c73da2e --- /dev/null +++ b/src/leap/bitmask/keymanager/migrator.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +# migrator.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Document migrator +""" +# XXX: versioning has being added 12/2015 when keymanager was not +# much in use in the wild. We can probably drop support for +# keys without version at some point. + + +from collections import namedtuple +from twisted.internet.defer import gatherResults, succeed + +from leap.keymanager import documents as doc +from leap.keymanager.validation import ValidationLevels + + +KEY_ID_KEY = 'key_id' + +KeyDocs = namedtuple("KeyDocs", ['key', 'active']) + + +class KeyDocumentsMigrator(object): + """ + Migrate old KeyManager Soledad Documents to the newest schema + """ + + def __init__(self, soledad): + self._soledad = soledad + + def migrate(self): + deferred_public = self._get_docs(private=False) + deferred_public.addCallback(self._migrate_docs) + + deferred_private = self._get_docs(private=True) + deferred_private.addCallback(self._migrate_docs) + + return gatherResults([deferred_public, deferred_private]) + + def _get_docs(self, private=False): + private_value = '1' if private else '0' + + deferred_keys = self._soledad.get_from_index( + doc.TAGS_PRIVATE_INDEX, + doc.KEYMANAGER_KEY_TAG, + private_value) + deferred_active = self._soledad.get_from_index( + doc.TAGS_PRIVATE_INDEX, + doc.KEYMANAGER_ACTIVE_TAG, + private_value) + return gatherResults([deferred_keys, deferred_active]) + + def _migrate_docs(self, (key_docs, active_docs)): + def update_keys(keys): + deferreds = [] + for key_id in keys: + key = keys[key_id].key + actives = keys[key_id].active + + d = self._migrate_actives(key, actives) + deferreds.append(d) + + d = self._migrate_key(key) + deferreds.append(d) + return gatherResults(deferreds) + + d = self._buildKeyDict(key_docs, active_docs) + d.addCallback(lambda keydict: self._filter_outdated(keydict)) + d.addCallback(update_keys) + + def _buildKeyDict(self, keys, actives): + keydict = { + fp2id(key.content[doc.KEY_FINGERPRINT_KEY]): KeyDocs(key, []) + for key in keys} + + deferreds = [] + for active in actives: + if KEY_ID_KEY in active.content: + key_id = active.content[KEY_ID_KEY] + if key_id not in keydict: + d = self._soledad.delete_doc(active) + deferreds.append(d) + continue + keydict[key_id].active.append(active) + + d = gatherResults(deferreds) + d.addCallback(lambda _: keydict) + return d + + def _filter_outdated(self, keydict): + outdated = {} + for key_id, docs in keydict.items(): + if ((docs.key and doc.KEY_VERSION_KEY not in docs.key.content) or + docs.active): + outdated[key_id] = docs + return outdated + + def _migrate_actives(self, key, actives): + if not key: + deferreds = [] + for active in actives: + d = self._soledad.delete_doc(active) + deferreds.append(d) + return gatherResults(deferreds) + + validation = str(ValidationLevels.Weak_Chain) + last_audited = 0 + encr_used = False + sign_used = False + fingerprint = key.content[doc.KEY_FINGERPRINT_KEY] + if len(actives) == 1 and doc.KEY_VERSION_KEY not in key.content: + # we can preserve the validation of the key if there is only one + # active address for the key + validation = key.content[doc.KEY_VALIDATION_KEY] + last_audited = key.content[doc.KEY_LAST_AUDITED_AT_KEY] + encr_used = key.content[doc.KEY_ENCR_USED_KEY] + sign_used = key.content[doc.KEY_SIGN_USED_KEY] + + deferreds = [] + for active in actives: + if doc.KEY_VERSION_KEY in active.content: + continue + + active.content[doc.KEY_VERSION_KEY] = doc.KEYMANAGER_DOC_VERSION + active.content[doc.KEY_FINGERPRINT_KEY] = fingerprint + active.content[doc.KEY_VALIDATION_KEY] = validation + active.content[doc.KEY_LAST_AUDITED_AT_KEY] = last_audited + active.content[doc.KEY_ENCR_USED_KEY] = encr_used + active.content[doc.KEY_SIGN_USED_KEY] = sign_used + del active.content[KEY_ID_KEY] + d = self._soledad.put_doc(active) + deferreds.append(d) + return gatherResults(deferreds) + + def _migrate_key(self, key): + if not key or doc.KEY_VERSION_KEY in key.content: + return succeed(None) + + key.content[doc.KEY_VERSION_KEY] = doc.KEYMANAGER_DOC_VERSION + key.content[doc.KEY_UIDS_KEY] = key.content[doc.KEY_ADDRESS_KEY] + del key.content[doc.KEY_ADDRESS_KEY] + del key.content[KEY_ID_KEY] + del key.content[doc.KEY_VALIDATION_KEY] + del key.content[doc.KEY_LAST_AUDITED_AT_KEY] + del key.content[doc.KEY_ENCR_USED_KEY] + del key.content[doc.KEY_SIGN_USED_KEY] + return self._soledad.put_doc(key) + + +def fp2id(fingerprint): + KEY_ID_LENGTH = 16 + return fingerprint[-KEY_ID_LENGTH:] diff --git a/src/leap/bitmask/keymanager/openpgp.py b/src/leap/bitmask/keymanager/openpgp.py new file mode 100644 index 0000000..31c13df --- /dev/null +++ b/src/leap/bitmask/keymanager/openpgp.py @@ -0,0 +1,881 @@ +# -*- coding: utf-8 -*- +# openpgp.py +# Copyright (C) 2013-2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Infrastructure for using OpenPGP keys in Key Manager. +""" +import logging +import os +import re +import tempfile +import traceback +import io + + +from datetime import datetime +from multiprocessing import cpu_count +from gnupg.gnupg import GPGUtilities +from twisted.internet import defer +from twisted.internet.threads import deferToThread + +from leap.common.check import leap_assert, leap_assert_type, leap_check +from leap.keymanager import errors +from leap.keymanager.wrapper import TempGPGWrapper +from leap.keymanager.keys import ( + OpenPGPKey, + is_address, + parse_address, + build_key_from_dict, +) +from leap.keymanager.documents import ( + init_indexes, + TAGS_PRIVATE_INDEX, + TYPE_FINGERPRINT_PRIVATE_INDEX, + TYPE_ADDRESS_PRIVATE_INDEX, + KEY_UIDS_KEY, + KEY_FINGERPRINT_KEY, + KEY_PRIVATE_KEY, + KEY_REFRESHED_AT_KEY, + KEY_SIGN_USED_KEY, + KEY_ENCR_USED_KEY, + KEY_ADDRESS_KEY, + KEY_TYPE_KEY, + KEY_VERSION_KEY, + KEYMANAGER_DOC_VERSION, + KEYMANAGER_ACTIVE_TYPE, + KEYMANAGER_KEY_TAG, + KEYMANAGER_ACTIVE_TAG, +) + + +logger = logging.getLogger(__name__) + + +# +# A temporary GPG keyring wrapped to provide OpenPGP functionality. +# + +# This function will be used to call blocking GPG functions outside +# of Twisted reactor and match the concurrent calls to the amount of CPU cores +cpu_core_semaphore = defer.DeferredSemaphore(cpu_count()) + + +def from_thread(func, *args, **kwargs): + call = lambda: deferToThread(func, *args, **kwargs) + return cpu_core_semaphore.run(call) + + +# +# The OpenPGP wrapper +# + +class OpenPGPScheme(object): + """ + A wrapper for OpenPGP keys management and use (encryption, decyption, + signing and verification). + """ + + # type used on the soledad documents + KEY_TYPE = OpenPGPKey.__name__ + ACTIVE_TYPE = KEY_TYPE + KEYMANAGER_ACTIVE_TYPE + + def __init__(self, soledad, gpgbinary=None): + """ + Initialize the OpenPGP wrapper. + + :param soledad: A Soledad instance for key storage. + :type soledad: leap.soledad.Soledad + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} + """ + self._soledad = soledad + self._gpgbinary = gpgbinary + self.deferred_init = init_indexes(soledad) + self.deferred_init.addCallback(self._migrate_documents_schema) + self._wait_indexes("get_key", "put_key", "get_all_keys") + + def _migrate_documents_schema(self, _): + from leap.keymanager.migrator import KeyDocumentsMigrator + migrator = KeyDocumentsMigrator(self._soledad) + return migrator.migrate() + + def _wait_indexes(self, *methods): + """ + Methods that need to wait for the indexes to be ready. + + Heavily based on + http://blogs.fluidinfo.com/terry/2009/05/11/a-mixin-class-allowing-python-__init__-methods-to-work-with-twisted-deferreds/ + + :param methods: methods that need to wait for the indexes to be ready + :type methods: tuple(str) + """ + self.waiting = [] + self.stored = {} + + def restore(_): + for method in self.stored: + setattr(self, method, self.stored[method]) + for d in self.waiting: + d.callback(None) + + def makeWrapper(method): + def wrapper(*args, **kw): + d = defer.Deferred() + d.addCallback(lambda _: self.stored[method](*args, **kw)) + self.waiting.append(d) + return d + return wrapper + + for method in methods: + self.stored[method] = getattr(self, method) + setattr(self, method, makeWrapper(method)) + + self.deferred_init.addCallback(restore) + + # + # Keys management + # + + def gen_key(self, address): + """ + Generate an OpenPGP keypair bound to C{address}. + + :param address: The address bound to the key. + :type address: str + + :return: A Deferred which fires with the key bound to address, or fails + with KeyAlreadyExists if key already exists in local database. + :rtype: Deferred + """ + # make sure the key does not already exist + leap_assert(is_address(address), 'Not an user address: %s' % address) + + @defer.inlineCallbacks + def _gen_key(_): + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: + # TODO: inspect result, or use decorator + params = gpg.gen_key_input( + key_type='RSA', + key_length=4096, + name_real=address, + name_email=address, + name_comment='') + logger.info("About to generate keys... " + "This might take SOME time.") + yield from_thread(gpg.gen_key, params) + logger.info("Keys for %s have been successfully " + "generated." % (address,)) + pubkeys = gpg.list_keys() + + # assert for new key characteristics + leap_assert( + len(pubkeys) is 1, # a unitary keyring! + 'Keyring has wrong number of keys: %d.' % len(pubkeys)) + key = gpg.list_keys(secret=True).pop() + leap_assert( + len(key['uids']) is 1, # with just one uid! + 'Wrong number of uids for key: %d.' % len(key['uids'])) + uid_match = False + for uid in key['uids']: + if re.match('.*<%s>$' % address, uid) is not None: + uid_match = True + break + leap_assert(uid_match, 'Key not correctly bound to address.') + + # insert both public and private keys in storage + deferreds = [] + for secret in [True, False]: + key = gpg.list_keys(secret=secret).pop() + openpgp_key = self._build_key_from_gpg( + key, + gpg.export_keys(key['fingerprint'], secret=secret), + address) + d = self.put_key(openpgp_key) + deferreds.append(d) + yield defer.gatherResults(deferreds) + + def key_already_exists(_): + raise errors.KeyAlreadyExists(address) + + d = self.get_key(address) + d.addCallbacks(key_already_exists, _gen_key) + d.addCallback(lambda _: self.get_key(address, private=True)) + return d + + def get_key(self, address, private=False): + """ + Get key bound to C{address} from local storage. + + :param address: The address bound to the key. + :type address: str + :param private: Look for a private key instead of a public one? + :type private: bool + + :return: A Deferred which fires with the OpenPGPKey bound to address, + or which fails with KeyNotFound if the key was not found on + local storage. + :rtype: Deferred + """ + address = parse_address(address) + + def build_key((keydoc, activedoc)): + if keydoc is None: + raise errors.KeyNotFound(address) + leap_assert( + address in keydoc.content[KEY_UIDS_KEY], + 'Wrong address in key %s. Expected %s, found %s.' + % (keydoc.content[KEY_FINGERPRINT_KEY], address, + keydoc.content[KEY_UIDS_KEY])) + key = build_key_from_dict(keydoc.content, activedoc.content) + key._gpgbinary = self._gpgbinary + return key + + d = self._get_key_doc(address, private) + d.addCallback(build_key) + return d + + @defer.inlineCallbacks + def get_all_keys(self, private=False): + """ + Return all keys stored in local database. + + :param private: Include private keys + :type private: bool + + :return: A Deferred which fires with a list of all keys in local db. + :rtype: Deferred + """ + HAS_ACTIVE = "has_active" + + active_docs = yield self._soledad.get_from_index( + TAGS_PRIVATE_INDEX, + KEYMANAGER_ACTIVE_TAG, + '1' if private else '0') + key_docs = yield self._soledad.get_from_index( + TAGS_PRIVATE_INDEX, + KEYMANAGER_KEY_TAG, + '1' if private else '0') + + keys = [] + fp = lambda doc: doc.content[KEY_FINGERPRINT_KEY] + for active in active_docs: + fp_keys = filter(lambda k: fp(k) == fp(active), key_docs) + + if len(fp_keys) == 0: + yield self._soledad.delete_doc(active) + continue + elif len(fp_keys) == 1: + key = fp_keys[0] + else: + key = yield self._repair_key_docs(fp_keys) + key.content[HAS_ACTIVE] = True + keys.append(build_key_from_dict(key.content, active.content)) + + unactive_keys = filter(lambda k: HAS_ACTIVE not in k.content, key_docs) + keys += map(lambda k: build_key_from_dict(k.content), unactive_keys) + defer.returnValue(keys) + + def parse_key(self, key_data, address=None): + """ + Parses a key (or key pair) data and returns + the OpenPGPKey keys. + + :param key_data: the key data to be parsed. + :type key_data: str or unicode + :param address: Active address for the key. + :type address: str + + :returns: the public key and private key (if applies) for that data. + :rtype: (public, private) -> tuple(OpenPGPKey, OpenPGPKey) + the tuple may have one or both components None + """ + leap_assert_type(key_data, (str, unicode)) + # TODO: add more checks for correct key data. + leap_assert(key_data is not None, 'Data does not represent a key.') + + priv_info, privkey = process_key( + key_data, self._gpgbinary, secret=True) + pub_info, pubkey = process_key( + key_data, self._gpgbinary, secret=False) + + if not pubkey: + return (None, None) + + openpgp_privkey = None + if privkey: + # build private key + openpgp_privkey = self._build_key_from_gpg(priv_info, privkey, + address) + leap_check(pub_info['fingerprint'] == priv_info['fingerprint'], + 'Fingerprints for public and private key differ.', + errors.KeyFingerprintMismatch) + # build public key + openpgp_pubkey = self._build_key_from_gpg(pub_info, pubkey, address) + + return (openpgp_pubkey, openpgp_privkey) + + def put_raw_key(self, key_data, address): + """ + Put key contained in C{key_data} in local storage. + + :param key_data: The key data to be stored. + :type key_data: str or unicode + :param address: address for which this key will be active + :type address: str + + :return: A Deferred which fires when the OpenPGPKey is in the storage. + :rtype: Deferred + """ + leap_assert_type(key_data, (str, unicode)) + + openpgp_privkey = None + try: + openpgp_pubkey, openpgp_privkey = self.parse_key( + key_data, address) + except (errors.KeyAddressMismatch, errors.KeyFingerprintMismatch) as e: + return defer.fail(e) + + def put_key(_, key): + return self.put_key(key) + + d = defer.succeed(None) + if openpgp_pubkey is not None: + d.addCallback(put_key, openpgp_pubkey) + if openpgp_privkey is not None: + d.addCallback(put_key, openpgp_privkey) + return d + + def put_key(self, key): + """ + Put C{key} in local storage. + + :param key: The key to be stored. + :type key: OpenPGPKey + + :return: A Deferred which fires when the key is in the storage. + :rtype: Deferred + """ + def merge_and_put((keydoc, activedoc)): + if not keydoc: + return put_new_key(activedoc) + + active_content = None + if activedoc: + active_content = activedoc.content + oldkey = build_key_from_dict(keydoc.content, active_content) + + key.merge(oldkey) + keydoc.set_json(key.get_json()) + d = self._soledad.put_doc(keydoc) + d.addCallback(put_active, activedoc) + return d + + def put_new_key(activedoc): + deferreds = [] + if activedoc: + d = self._soledad.delete_doc(activedoc) + deferreds.append(d) + for json in [key.get_json(), key.get_active_json()]: + d = self._soledad.create_doc_from_json(json) + deferreds.append(d) + return defer.gatherResults(deferreds) + + def put_active(_, activedoc): + active_json = key.get_active_json() + if activedoc: + activedoc.set_json(active_json) + d = self._soledad.put_doc(activedoc) + else: + d = self._soledad.create_doc_from_json(active_json) + return d + + def get_active_doc(keydoc): + d = self._get_active_doc_from_address(key.address, key.private) + d.addCallback(lambda activedoc: (keydoc, activedoc)) + return d + + d = self._get_key_doc_from_fingerprint(key.fingerprint, key.private) + d.addCallback(get_active_doc) + d.addCallback(merge_and_put) + return d + + def _get_key_doc(self, address, private=False): + """ + Get the document with a key (public, by default) bound to C{address}. + + If C{private} is True, looks for a private key instead of a public. + + :param address: The address bound to the key. + :type address: str + :param private: Whether to look for a private key. + :type private: bool + + :return: A Deferred which fires with a touple of two SoledadDocument + (keydoc, activedoc) or None if it does not exist. + :rtype: Deferred + """ + def get_key_from_active_doc(activedoc): + if not activedoc: + return (None, None) + fingerprint = activedoc.content[KEY_FINGERPRINT_KEY] + d = self._get_key_doc_from_fingerprint(fingerprint, private) + d.addCallback(delete_active_if_no_key, activedoc) + return d + + def delete_active_if_no_key(keydoc, activedoc): + if not keydoc: + d = self._soledad.delete_doc(activedoc) + d.addCallback(lambda _: (None, None)) + return d + return (keydoc, activedoc) + + d = self._get_active_doc_from_address(address, private) + d.addCallback(get_key_from_active_doc) + return d + + def _build_key_from_gpg(self, key, key_data, address=None): + """ + Build an OpenPGPKey for C{address} based on C{key} from + local gpg storage. + + GPG key data has to be queried independently in this + wrapper, so we receive it in C{key_data}. + + :param address: Active address for the key. + :type address: str + :param key: Key obtained from GPG storage. + :type key: dict + :param key_data: Key data obtained from GPG storage. + :type key_data: str + :return: An instance of the key. + :rtype: OpenPGPKey + """ + return build_gpg_key(key, key_data, address, self._gpgbinary) + + def delete_key(self, key): + """ + Remove C{key} from storage. + + :param key: The key to be removed. + :type key: EncryptionKey + + :return: A Deferred which fires when the key is deleted, or which + fails with KeyNotFound if the key was not found on local + storage. + :rtype: Deferred + """ + leap_assert_type(key, OpenPGPKey) + + def delete_docs(activedocs): + deferreds = [] + for doc in activedocs: + d = self._soledad.delete_doc(doc) + deferreds.append(d) + return defer.gatherResults(deferreds) + + def get_key_docs(_): + return self._soledad.get_from_index( + TYPE_FINGERPRINT_PRIVATE_INDEX, + self.KEY_TYPE, + key.fingerprint, + '1' if key.private else '0') + + def delete_key(docs): + if len(docs) == 0: + raise errors.KeyNotFound(key) + elif len(docs) > 1: + logger.warning("There is more than one key for fingerprint %s" + % key.fingerprint) + + has_deleted = False + deferreds = [] + for doc in docs: + if doc.content['fingerprint'] == key.fingerprint: + d = self._soledad.delete_doc(doc) + deferreds.append(d) + has_deleted = True + if not has_deleted: + raise errors.KeyNotFound(key) + return defer.gatherResults(deferreds) + + d = self._soledad.get_from_index( + TYPE_FINGERPRINT_PRIVATE_INDEX, + self.ACTIVE_TYPE, + key.fingerprint, + '1' if key.private else '0') + d.addCallback(delete_docs) + d.addCallback(get_key_docs) + d.addCallback(delete_key) + return d + + # + # Data encryption, decryption, signing and verifying + # + + @staticmethod + def _assert_gpg_result_ok(result): + """ + Check if GPG result is 'ok' and log stderr outputs. + + :param result: GPG results, which have a field calld 'ok' that states + whether the gpg operation was successful or not. + :type result: object + + :raise GPGError: Raised when the gpg operation was not successful. + """ + stderr = getattr(result, 'stderr', None) + if stderr: + logger.debug("%s" % (stderr,)) + if getattr(result, 'ok', None) is not True: + raise errors.GPGError( + 'Failed to encrypt/decrypt: %s' % stderr) + + @defer.inlineCallbacks + def encrypt(self, data, pubkey, passphrase=None, sign=None, + cipher_algo='AES256'): + """ + Encrypt C{data} using public @{pubkey} and sign with C{sign} key. + + :param data: The data to be encrypted. + :type data: str + :param pubkey: The key used to encrypt. + :type pubkey: OpenPGPKey + :param sign: The key used for signing. + :type sign: OpenPGPKey + :param cipher_algo: The cipher algorithm to use. + :type cipher_algo: str + + :return: A Deferred that will be fired with the encrypted data. + :rtype: defer.Deferred + + :raise EncryptError: Raised if failed encrypting for some reason. + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False, 'Key is not public.') + keys = [pubkey] + if sign is not None: + leap_assert_type(sign, OpenPGPKey) + leap_assert(sign.private is True) + keys.append(sign) + with TempGPGWrapper(keys, self._gpgbinary) as gpg: + result = yield from_thread( + gpg.encrypt, + data, pubkey.fingerprint, + default_key=sign.fingerprint if sign else None, + passphrase=passphrase, symmetric=False, + cipher_algo=cipher_algo) + # Here we cannot assert for correctness of sig because the sig is + # in the ciphertext. + # result.ok - (bool) indicates if the operation succeeded + # result.data - (bool) contains the result of the operation + try: + self._assert_gpg_result_ok(result) + defer.returnValue(result.data) + except errors.GPGError as e: + logger.warning('Failed to encrypt: %s.' % str(e)) + raise errors.EncryptError() + + @defer.inlineCallbacks + def decrypt(self, data, privkey, passphrase=None, verify=None): + """ + Decrypt C{data} using private @{privkey} and verify with C{verify} key. + + :param data: The data to be decrypted. + :type data: str + :param privkey: The key used to decrypt. + :type privkey: OpenPGPKey + :param passphrase: The passphrase for the secret key used for + decryption. + :type passphrase: str + :param verify: The key used to verify a signature. + :type verify: OpenPGPKey + + :return: Deferred that will fire with the decrypted data and + if signature verifies (unicode, bool) + :rtype: Deferred + + :raise DecryptError: Raised if failed decrypting for some reason. + """ + leap_assert(privkey.private is True, 'Key is not private.') + keys = [privkey] + if verify is not None: + leap_assert_type(verify, OpenPGPKey) + leap_assert(verify.private is False) + keys.append(verify) + with TempGPGWrapper(keys, self._gpgbinary) as gpg: + try: + result = yield from_thread(gpg.decrypt, + data, passphrase=passphrase, + always_trust=True) + self._assert_gpg_result_ok(result) + + # verify signature + sign_valid = False + if (verify is not None and + result.valid is True and + verify.fingerprint == result.pubkey_fingerprint): + sign_valid = True + + defer.returnValue((result.data, sign_valid)) + except errors.GPGError as e: + logger.warning('Failed to decrypt: %s.' % str(e)) + raise errors.DecryptError(str(e)) + + def is_encrypted(self, data): + """ + Return whether C{data} was asymmetrically encrypted using OpenPGP. + + :param data: The data we want to know about. + :type data: str + + :return: Whether C{data} was encrypted using this wrapper. + :rtype: bool + """ + with TempGPGWrapper(gpgbinary=self._gpgbinary) as gpg: + gpgutil = GPGUtilities(gpg) + return gpgutil.is_encrypted_asym(data) + + def sign(self, data, privkey, digest_algo='SHA512', clearsign=False, + detach=True, binary=False): + """ + Sign C{data} with C{privkey}. + + :param data: The data to be signed. + :type data: str + + :param privkey: The private key to be used to sign. + :type privkey: OpenPGPKey + :param digest_algo: The hash digest to use. + :type digest_algo: str + :param clearsign: If True, create a cleartext signature. + :type clearsign: bool + :param detach: If True, create a detached signature. + :type detach: bool + :param binary: If True, do not ascii armour the output. + :type binary: bool + + :return: The ascii-armored signed data. + :rtype: str + """ + leap_assert_type(privkey, OpenPGPKey) + leap_assert(privkey.private is True) + + # result.fingerprint - contains the fingerprint of the key used to + # sign. + with TempGPGWrapper(privkey, self._gpgbinary) as gpg: + result = gpg.sign(data, default_key=privkey.fingerprint, + digest_algo=digest_algo, clearsign=clearsign, + detach=detach, binary=binary) + rfprint = privkey.fingerprint + privkey = gpg.list_keys(secret=True).pop() + kfprint = privkey['fingerprint'] + if result.fingerprint is None: + raise errors.SignFailed( + 'Failed to sign with key %s: %s' % + (privkey['fingerprint'], result.stderr)) + leap_assert( + result.fingerprint == kfprint, + 'Signature and private key fingerprints mismatch: ' + '%s != %s' % (rfprint, kfprint)) + return result.data + + def verify(self, data, pubkey, detached_sig=None): + """ + Verify signed C{data} with C{pubkey}, eventually using + C{detached_sig}. + + :param data: The data to be verified. + :type data: str + :param pubkey: The public key to be used on verification. + :type pubkey: OpenPGPKey + :param detached_sig: A detached signature. If given, C{data} is + verified against this detached signature. + :type detached_sig: str + + :return: signature matches + :rtype: bool + """ + leap_assert_type(pubkey, OpenPGPKey) + leap_assert(pubkey.private is False) + with TempGPGWrapper(pubkey, self._gpgbinary) as gpg: + result = None + if detached_sig is None: + result = gpg.verify(data) + else: + # to verify using a detached sig we have to use + # gpg.verify_file(), which receives the data as a binary + # stream and the name of a file containing the signature. + sf, sfname = tempfile.mkstemp() + with os.fdopen(sf, 'w') as sfd: + sfd.write(detached_sig) + result = gpg.verify_file(io.BytesIO(data), sig_file=sfname) + os.unlink(sfname) + gpgpubkey = gpg.list_keys().pop() + valid = result.valid + rfprint = result.fingerprint + kfprint = gpgpubkey['fingerprint'] + return valid and rfprint == kfprint + + def _get_active_doc_from_address(self, address, private): + d = self._soledad.get_from_index( + TYPE_ADDRESS_PRIVATE_INDEX, + self.ACTIVE_TYPE, + address, + '1' if private else '0') + d.addCallback(self._repair_and_get_doc, self._repair_active_docs) + d.addCallback(self._check_version) + return d + + def _get_key_doc_from_fingerprint(self, fingerprint, private): + d = self._soledad.get_from_index( + TYPE_FINGERPRINT_PRIVATE_INDEX, + self.KEY_TYPE, + fingerprint, + '1' if private else '0') + d.addCallback(self._repair_and_get_doc, self._repair_key_docs) + d.addCallback(self._check_version) + return d + + def _repair_and_get_doc(self, doclist, repair_func): + if len(doclist) is 0: + return None + elif len(doclist) > 1: + return repair_func(doclist) + return doclist[0] + + def _check_version(self, doc): + if doc is not None: + version = doc.content[KEY_VERSION_KEY] + if version > KEYMANAGER_DOC_VERSION: + raise errors.KeyVersionError(str(version)) + return doc + + def _repair_key_docs(self, doclist): + """ + If there is more than one key for a key id try to self-repair it + + :return: a Deferred that will be fired with the valid key doc once all + the deletions are completed + :rtype: Deferred + """ + def log_key_doc(doc): + logger.error("\t%s: %s" % (doc.content[KEY_UIDS_KEY], + doc.content[KEY_FINGERPRINT_KEY])) + + def cmp_key(d1, d2): + return cmp(d1.content[KEY_REFRESHED_AT_KEY], + d2.content[KEY_REFRESHED_AT_KEY]) + + return self._repair_docs(doclist, cmp_key, log_key_doc) + + @defer.inlineCallbacks + def _repair_active_docs(self, doclist): + """ + If there is more than one active doc for an address try to self-repair + it + + :return: a Deferred that will be fired with the valid active doc once + all the deletions are completed + :rtype: Deferred + """ + keys = {} + for doc in doclist: + fp = doc.content[KEY_FINGERPRINT_KEY] + private = doc.content[KEY_PRIVATE_KEY] + try: + key = yield self._get_key_doc_from_fingerprint(fp, private) + keys[fp] = key + except Exception: + pass + + def log_active_doc(doc): + logger.error("\t%s: %s" % (doc.content[KEY_ADDRESS_KEY], + doc.content[KEY_FINGERPRINT_KEY])) + + def cmp_active(d1, d2): + # XXX: for private keys it will be nice to check which key is known + # by the nicknym server and keep this one. But this needs a + # refactor that might not be worth it. + used1 = (d1.content[KEY_SIGN_USED_KEY] + + d1.content[KEY_ENCR_USED_KEY]) + used2 = (d2.content[KEY_SIGN_USED_KEY] + + d2.content[KEY_ENCR_USED_KEY]) + res = cmp(used1, used2) + if res != 0: + return res + + key1 = keys[d1.content[KEY_FINGERPRINT_KEY]] + key2 = keys[d2.content[KEY_FINGERPRINT_KEY]] + return cmp(key1.content[KEY_REFRESHED_AT_KEY], + key2.content[KEY_REFRESHED_AT_KEY]) + + doc = yield self._repair_docs(doclist, cmp_active, log_active_doc) + defer.returnValue(doc) + + def _repair_docs(self, doclist, cmp_func, log_func): + logger.error("BUG ---------------------------------------------------") + logger.error("There is more than one doc of type %s:" + % (doclist[0].content[KEY_TYPE_KEY],)) + + doclist.sort(cmp=cmp_func, reverse=True) + log_func(doclist[0]) + deferreds = [] + for doc in doclist[1:]: + log_func(doc) + d = self._soledad.delete_doc(doc) + deferreds.append(d) + + logger.error("") + logger.error(traceback.extract_stack()) + logger.error("BUG (please report above info) ------------------------") + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addCallback(lambda _: doclist[0]) + return d + + +def process_key(key_data, gpgbinary, secret=False): + with TempGPGWrapper(gpgbinary=gpgbinary) as gpg: + try: + gpg.import_keys(key_data) + info = gpg.list_keys(secret=secret).pop() + key = gpg.export_keys(info['fingerprint'], secret=secret) + except IndexError: + info = {} + key = None + return info, key + + +def build_gpg_key(key_info, key_data, address=None, gpgbinary=None): + expiry_date = None + if key_info['expires']: + expiry_date = datetime.fromtimestamp(int(key_info['expires'])) + uids = [] + for uid in key_info['uids']: + uids.append(parse_address(uid)) + if address and address not in uids: + raise errors.KeyAddressMismatch("UIDs %s found, but expected %s" + % (str(uids), address)) + + return OpenPGPKey( + address=address, + uids=uids, + gpgbinary=gpgbinary, + fingerprint=key_info['fingerprint'], + key_data=key_data, + private=True if key_info['type'] == 'sec' else False, + length=int(key_info['length']), + expiry_date=expiry_date, + refreshed_at=datetime.now()) diff --git a/src/leap/bitmask/keymanager/validation.py b/src/leap/bitmask/keymanager/validation.py new file mode 100644 index 0000000..16a897e --- /dev/null +++ b/src/leap/bitmask/keymanager/validation.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Validation levels implementation for key managment. + +See: + https://leap.se/en/docs/design/transitional-key-validation +""" + + +from datetime import datetime + + +class ValidationLevel(object): + """ + A validation level + + Meant to be used to compare levels or get its string representation. + """ + def __init__(self, name, value): + self.name = name + self.value = value + + def __cmp__(self, other): + return cmp(self.value, other.value) + + def __str__(self): + return self.name + + def __repr__(self): + return "<ValidationLevel: %s (%d)>" % (self.name, self.value) + + +class _ValidationLevels(object): + """ + Handler class to manage validation levels. It should have only one global + instance 'ValidationLevels'. + + The levels are attributes of the instance and can be used like: + ValidationLevels.Weak_Chain + ValidationLevels.get("Weak_Chain") + """ + _level_names = ("Weak_Chain", + "Provider_Trust", + "Provider_Endorsement", + "Third_Party_Endorsement", + "Third_Party_Consensus", + "Historically_Auditing", + "Known_Key", + "Fingerprint") + + def __init__(self): + for name in self._level_names: + setattr(self, name, + ValidationLevel(name, self._level_names.index(name))) + + def get(self, name): + """ + Get the ValidationLevel of a name + + :param name: name of the level + :type name: str + :rtype: ValidationLevel + """ + return getattr(self, name) + + def __iter__(self): + return iter(self._level_names) + + +ValidationLevels = _ValidationLevels() + + +def can_upgrade(new_key, old_key): + """ + :type new_key: EncryptionKey + :type old_key: EncryptionKey + :rtype: bool + """ + # First contact + if old_key is None: + return True + + # An update of the same key + if new_key.fingerprint == old_key.fingerprint: + return True + + # Manually verified fingerprint + if new_key.validation == ValidationLevels.Fingerprint: + return True + + # Expired key and higher validation level + if (old_key.expiry_date is not None and + old_key.expiry_date < datetime.now() and + new_key.validation >= old_key.validation): + return True + + # No expiration date and higher validation level + if (old_key.expiry_date is None and + new_key.validation > old_key.validation): + return True + + # Not successfully used and strict high validation level + if (not (old_key.sign_used and old_key.encr_used) and + new_key.validation > old_key.validation): + return True + + # New key signed by the old key + # XXX: signatures are using key-ids instead of fingerprints + key_id = old_key.fingerprint[-16:] + if key_id in new_key.signatures: + return True + + return False diff --git a/src/leap/bitmask/keymanager/wrapper.py b/src/leap/bitmask/keymanager/wrapper.py new file mode 100644 index 0000000..4f36cec --- /dev/null +++ b/src/leap/bitmask/keymanager/wrapper.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# wrapper.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +GPG wrapper for temporary keyrings +""" +import os +import shutil +import tempfile +from gnupg import GPG + +from leap.common.check import leap_assert + + +class TempGPGWrapper(object): + """ + A context manager that wraps a temporary GPG keyring which only contains + the keys given at object creation. + """ + + def __init__(self, keys=None, gpgbinary=None): + """ + Create an empty temporary keyring and import any given C{keys} into + it. + + :param keys: OpenPGP key, or list of. + :type keys: OpenPGPKey or list of OpenPGPKeys + :param gpgbinary: Name for GnuPG binary executable. + :type gpgbinary: C{str} + """ + self._gpg = None + self._gpgbinary = gpgbinary + if not keys: + keys = list() + if not isinstance(keys, list): + keys = [keys] + self._keys = keys + + def __enter__(self): + """ + Build and return a GPG keyring containing the keys given on + object creation. + + :return: A GPG instance containing the keys given on object creation. + :rtype: gnupg.GPG + """ + self._build_keyring() + return self._gpg + + def __exit__(self, exc_type, exc_value, traceback): + """ + Ensure the gpg is properly destroyed. + """ + # TODO handle exceptions and log here + self._destroy_keyring() + + def _build_keyring(self): + """ + Create a GPG keyring containing the keys given on object creation. + + :return: A GPG instance containing the keys given on object creation. + :rtype: gnupg.GPG + """ + privkeys = [key for key in self._keys if key and key.private is True] + publkeys = [key for key in self._keys if key and key.private is False] + # here we filter out public keys that have a correspondent + # private key in the list because the private key_data by + # itself is enough to also have the public key in the keyring, + # and we want to count the keys afterwards. + + privfps = map(lambda privkey: privkey.fingerprint, privkeys) + publkeys = filter( + lambda pubkey: pubkey.fingerprint not in privfps, publkeys) + + listkeys = lambda: self._gpg.list_keys() + listsecretkeys = lambda: self._gpg.list_keys(secret=True) + + self._gpg = GPG(binary=self._gpgbinary, + homedir=tempfile.mkdtemp()) + leap_assert(len(listkeys()) is 0, 'Keyring not empty.') + + # import keys into the keyring: + # concatenating ascii-armored keys, which is correctly + # understood by GPG. + + self._gpg.import_keys("".join( + [x.key_data for x in publkeys + privkeys])) + + # assert the number of keys in the keyring + leap_assert( + len(listkeys()) == len(publkeys) + len(privkeys), + 'Wrong number of public keys in keyring: %d, should be %d)' % + (len(listkeys()), len(publkeys) + len(privkeys))) + leap_assert( + len(listsecretkeys()) == len(privkeys), + 'Wrong number of private keys in keyring: %d, should be %d)' % + (len(listsecretkeys()), len(privkeys))) + + def _destroy_keyring(self): + """ + Securely erase the keyring. + """ + # TODO: implement some kind of wiping of data or a more + # secure way that + # does not write to disk. + + try: + for secret in [True, False]: + for key in self._gpg.list_keys(secret=secret): + self._gpg.delete_keys( + key['fingerprint'], + secret=secret) + leap_assert(len(self._gpg.list_keys()) is 0, 'Keyring not empty!') + + except: + raise + + finally: + leap_assert(self._gpg.homedir != os.path.expanduser('~/.gnupg'), + "watch out! Tried to remove default gnupg home!") + shutil.rmtree(self._gpg.homedir) diff --git a/tests/keymanager/common.py b/tests/keymanager/common.py new file mode 100644 index 0000000..8eb5d4e --- /dev/null +++ b/tests/keymanager/common.py @@ -0,0 +1,326 @@ +# -*- coding: utf-8 -*- +# test_keymanager.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Base classes for the Key Manager tests. +""" + +import distutils.spawn +import os.path + +from twisted.internet.defer import gatherResults +from twisted.trial import unittest + +from leap.common.testing.basetest import BaseLeapTest +from leap.soledad.client import Soledad +from leap.keymanager import KeyManager + +PATH = os.path.dirname(os.path.realpath(__file__)) + +ADDRESS = 'leap@leap.se' +ADDRESS_2 = 'anotheruser@leap.se' + + +class KeyManagerWithSoledadTestCase(unittest.TestCase, BaseLeapTest): + + def setUp(self): + self.gpg_binary_path = self._find_gpg() + + self._soledad = Soledad( + u"leap@leap.se", + u"123456", + secrets_path=self.tempdir + "/secret.gpg", + local_db_path=self.tempdir + "/soledad.u1db", + server_url='', + cert_file=None, + auth_token=None, + syncable=False + ) + + def tearDown(self): + km = self._key_manager() + + # wait for the indexes to be ready for the tear down + d = km._openpgp.deferred_init + d.addCallback(lambda _: self.delete_all_keys(km)) + d.addCallback(lambda _: self._soledad.close()) + return d + + def delete_all_keys(self, km): + def delete_keys(keys): + deferreds = [] + for key in keys: + d = km._openpgp.delete_key(key) + deferreds.append(d) + return gatherResults(deferreds) + + def check_deleted(_, private): + d = km.get_all_keys(private=private) + d.addCallback(lambda keys: self.assertEqual(keys, [])) + return d + + deferreds = [] + for private in [True, False]: + d = km.get_all_keys(private=private) + d.addCallback(delete_keys) + d.addCallback(check_deleted, private) + deferreds.append(d) + return gatherResults(deferreds) + + def _key_manager(self, user=ADDRESS, url='', token=None, + ca_cert_path=None): + return KeyManager(user, url, self._soledad, token=token, + gpgbinary=self.gpg_binary_path, + ca_cert_path=ca_cert_path) + + def _find_gpg(self): + gpg_path = distutils.spawn.find_executable('gpg') + if gpg_path is not None: + return os.path.realpath(gpg_path) + else: + return "/usr/bin/gpg" + + def get_public_binary_key(self): + with open(PATH + '/fixtures/public_key.bin', 'r') as binary_public_key: + return binary_public_key.read() + + def get_private_binary_key(self): + with open( + PATH + '/fixtures/private_key.bin', 'r') as binary_private_key: + return binary_private_key.read() + + +# key 24D18DDF: public key "Leap Test Key <leap@leap.se>" +KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF" +PUBLIC_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +mQINBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz +iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO +zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx +irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT +huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs +d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g +wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb +hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv +U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H +T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i +Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB +tBxMZWFwIFRlc3QgS2V5IDxsZWFwQGxlYXAuc2U+iQI3BBMBCAAhBQJQvfnZAhsD +BQsJCAcDBRUKCQgLBRYCAwEAAh4BAheAAAoJEC9FXigk0Y3fT7EQAKH3IuRniOpb +T/DDIgwwjz3oxB/W0DDMyPXowlhSOuM0rgGfntBpBb3boezEXwL86NPQxNGGruF5 +hkmecSiuPSvOmQlqlS95NGQp6hNG0YaKColh+Q5NTspFXCAkFch9oqUje0LdxfSP +QfV9UpeEvGyPmk1I9EJV/YDmZ4+Djge1d7qhVZInz4Rx1NrSyF/Tc2EC0VpjQFsU +Y9Kb2YBBR7ivG6DBc8ty0jJXi7B4WjkFcUEJviQpMF2dCLdonCehYs1PqsN1N7j+ +eFjQd+hqVMJgYuSGKjvuAEfClM6MQw7+FmFwMyLgK/Ew/DttHEDCri77SPSkOGSI +txCzhTg6798f6mJr7WcXmHX1w1Vcib5FfZ8vTDFVhz/XgAgArdhPo9V6/1dgSSiB +KPQ/spsco6u5imdOhckERE0lnAYvVT6KE81TKuhF/b23u7x+Wdew6kK0EQhYA7wy +7LmlaNXc7rMBQJ9Z60CJ4JDtatBWZ0kNrt2VfdDHVdqBTOpl0CraNUjWE5YMDasr +K2dF5IX8D3uuYtpZnxqg0KzyLg0tzL0tvOL1C2iudgZUISZNPKbS0z0v+afuAAnx +2pTC3uezbh2Jt8SWTLhll4i0P4Ps5kZ6HQUO56O+/Z1cWovX+mQekYFmERySDR9n +3k1uAwLilJmRmepGmvYbB8HloV8HqwgguQINBFC9+dkBEAC0I/xn1uborMgDvBtf +H0sEhwnXBC849/32zic6udB6/3Efk9nzbSpL3FSOuXITZsZgCHPkKarnoQ2ztMcS +sh1ke1C5gQGms75UVmM/nS+2YI4vY8OX/GC/on2vUyncqdH+bR6xH5hx4NbWpfTs +iQHmz5C6zzS/kuabGdZyKRaZHt23WQ7JX/4zpjqbC99DjHcP9BSk7tJ8wI4bkMYD +uFVQdT9O6HwyKGYwUU4sAQRAj7XCTGvVbT0dpgJwH4RmrEtJoHAx4Whg8mJ710E0 +GCmzf2jqkNuOw76ivgk27Kge+Hw00jmJjQhHY0yVbiaoJwcRrPKzaSjEVNgrpgP3 +lXPRGQArgESsIOTeVVHQ8fhK2YtTeCY9rIiO+L0OX2xo9HK7hfHZZWL6rqymXdyS +fhzh/f6IPyHFWnvj7Brl7DR8heMikygcJqv+ed2yx7iLyCUJ10g12I48+aEj1aLe +dP7lna32iY8/Z0SHQLNH6PXO9SlPcq2aFUgKqE75A/0FMk7CunzU1OWr2ZtTLNO1 +WT/13LfOhhuEq9jTyTosn0WxBjJKq18lnhzCXlaw6EAtbA7CUwsD3CTPR56aAXFK +3I7KXOVAqggrvMe5Tpdg5drfYpI8hZovL5aAgb+7Y5ta10TcJdUhS5K3kFAWe/td +U0cmWUMDP1UMSQ5Jg6JIQVWhSwARAQABiQIfBBgBCAAJBQJQvfnZAhsMAAoJEC9F +Xigk0Y3fRwsP/i0ElYCyxeLpWJTwo1iCLkMKz2yX1lFVa9nT1BVTPOQwr/IAc5OX +NdtbJ14fUsKL5pWgW8OmrXtwZm1y4euI1RPWWubG01ouzwnGzv26UcuHeqC5orZj +cOnKtL40y8VGMm8LoicVkRJH8blPORCnaLjdOtmA3rx/v2EXrJpSa3AhOy0ZSRXk +ZSrK68AVNwamHRoBSYyo0AtaXnkPX4+tmO8X8BPfj125IljubvwZPIW9VWR9UqCE +VPfDR1XKegVb6VStIywF7kmrknM1C5qUY28rdZYWgKorw01hBGV4jTW0cqde3N51 +XT1jnIAa+NoXUM9uQoGYMiwrL7vNsLlyyiW5ayDyV92H/rIuiqhFgbJsHTlsm7I8 +oGheR784BagAA1NIKD1qEO9T6Kz9lzlDaeWS5AUKeXrb7ZJLI1TTCIZx5/DxjLqM +Tt/RFBpVo9geZQrvLUqLAMwdaUvDXC2c6DaCPXTh65oCZj/hqzlJHH+RoTWWzKI+ +BjXxgUWF9EmZUBrg68DSmI+9wuDFsjZ51BcqvJwxyfxtTaWhdoYqH/UQS+D1FP3/ +diZHHlzwVwPICzM9ooNTgbrcDzyxRkIVqsVwBq7EtzcvgYUyX53yG25Giy6YQaQ2 +ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX +=MuOY +-----END PGP PUBLIC KEY BLOCK----- +""" +PRIVATE_KEY = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +lQcYBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz +iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO +zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx +irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT +huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs +d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g +wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb +hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv +U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H +T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i +Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB +AA/+JHtlL39G1wsH9R6UEfUQJGXR9MiIiwZoKcnRB2o8+DS+OLjg0JOh8XehtuCs +E/8oGQKtQqa5bEIstX7IZoYmYFiUQi9LOzIblmp2vxOm+HKkxa4JszWci2/ZmC3t +KtaA4adl9XVnshoQ7pijuCMUKB3naBEOAxd8s9d/JeReGIYkJErdrnVfNk5N71Ds +FmH5Ll3XtEDvgBUQP3nkA6QFjpsaB94FHjL3gDwum/cxzj6pCglcvHOzEhfY0Ddb +J967FozQTaf2JW3O+w3LOqtcKWpq87B7+O61tVidQPSSuzPjCtFF0D2LC9R/Hpky +KTMQ6CaKja4MPhjwywd4QPcHGYSqjMpflvJqi+kYIt8psUK/YswWjnr3r4fbuqVY +VhtiHvnBHQjz135lUqWvEz4hM3Xpnxydx7aRlv5NlevK8+YIO5oFbWbGNTWsPZI5 +jpoFBpSsnR1Q5tnvtNHauvoWV+XN2qAOBTG+/nEbDYH6Ak3aaE9jrpTdYh0CotYF +q7csANsDy3JvkAzeU6WnYpsHHaAjqOGyiZGsLej1UcXPFMosE/aUo4WQhiS8Zx2c +zOVKOi/X5vQ2GdNT9Qolz8AriwzsvFR+bxPzyd8V6ALwDsoXvwEYinYBKK8j0OPv +OOihSR6HVsuP9NUZNU9ewiGzte/+/r6pNXHvR7wTQ8EWLcEIAN6Zyrb0bHZTIlxt +VWur/Ht2mIZrBaO50qmM5RD3T5oXzWXi/pjLrIpBMfeZR9DWfwQwjYzwqi7pxtYx +nJvbMuY505rfnMoYxb4J+cpRXV8MS7Dr1vjjLVUC9KiwSbM3gg6emfd2yuA93ihv +Pe3mffzLIiQa4mRE3wtGcioC43nWuV2K2e1KjxeFg07JhrezA/1Cak505ab/tmvP +4YmjR5c44+yL/YcQ3HdFgs4mV+nVbptRXvRcPpolJsgxPccGNdvHhsoR4gwXMS3F +RRPD2z6x8xeN73Q4KH3bm01swQdwFBZbWVfmUGLxvN7leCdfs9+iFJyqHiCIB6Iv +mQfp8F0IAOwSo8JhWN+V1dwML4EkIrM8wUb4yecNLkyR6TpPH/qXx4PxVMC+vy6x +sCtjeHIwKE+9vqnlhd5zOYh7qYXEJtYwdeDDmDbL8oks1LFfd+FyAuZXY33DLwn0 +cRYsr2OEZmaajqUB3NVmj3H4uJBN9+paFHyFSXrH68K1Fk2o3n+RSf2EiX+eICwI +L6rqoF5sSVUghBWdNegV7qfy4anwTQwrIMGjgU5S6PKW0Dr/3iO5z3qQpGPAj5OW +ATqPWkDICLbObPxD5cJlyyNE2wCA9VVc6/1d6w4EVwSq9h3/WTpATEreXXxTGptd +LNiTA1nmakBYNO2Iyo3djhaqBdWjk+EIAKtVEnJH9FAVwWOvaj1RoZMA5DnDMo7e +SnhrCXl8AL7Z1WInEaybasTJXn1uQ8xY52Ua4b8cbuEKRKzw/70NesFRoMLYoHTO +dyeszvhoDHberpGRTciVmpMu7Hyi33rM31K9epA4ib6QbbCHnxkWOZB+Bhgj1hJ8 +xb4RBYWiWpAYcg0+DAC3w9gfxQhtUlZPIbmbrBmrVkO2GVGUj8kH6k4UV6kUHEGY +HQWQR0HcbKcXW81ZXCCD0l7ROuEWQtTe5Jw7dJ4/QFuqZnPutXVRNOZqpl6eRShw +7X2/a29VXBpmHA95a88rSQsL+qm7Fb3prqRmuMCtrUZgFz7HLSTuUMR867QcTGVh +cCBUZXN0IEtleSA8bGVhcEBsZWFwLnNlPokCNwQTAQgAIQUCUL352QIbAwULCQgH +AwUVCgkICwUWAgMBAAIeAQIXgAAKCRAvRV4oJNGN30+xEACh9yLkZ4jqW0/wwyIM +MI896MQf1tAwzMj16MJYUjrjNK4Bn57QaQW926HsxF8C/OjT0MTRhq7heYZJnnEo +rj0rzpkJapUveTRkKeoTRtGGigqJYfkOTU7KRVwgJBXIfaKlI3tC3cX0j0H1fVKX +hLxsj5pNSPRCVf2A5mePg44HtXe6oVWSJ8+EcdTa0shf03NhAtFaY0BbFGPSm9mA +QUe4rxugwXPLctIyV4uweFo5BXFBCb4kKTBdnQi3aJwnoWLNT6rDdTe4/nhY0Hfo +alTCYGLkhio77gBHwpTOjEMO/hZhcDMi4CvxMPw7bRxAwq4u+0j0pDhkiLcQs4U4 +Ou/fH+pia+1nF5h19cNVXIm+RX2fL0wxVYc/14AIAK3YT6PVev9XYEkogSj0P7Kb +HKOruYpnToXJBERNJZwGL1U+ihPNUyroRf29t7u8flnXsOpCtBEIWAO8Muy5pWjV +3O6zAUCfWetAieCQ7WrQVmdJDa7dlX3Qx1XagUzqZdAq2jVI1hOWDA2rKytnReSF +/A97rmLaWZ8aoNCs8i4NLcy9Lbzi9QtornYGVCEmTTym0tM9L/mn7gAJ8dqUwt7n +s24dibfElky4ZZeItD+D7OZGeh0FDuejvv2dXFqL1/pkHpGBZhEckg0fZ95NbgMC +4pSZkZnqRpr2GwfB5aFfB6sIIJ0HGARQvfnZARAAtCP8Z9bm6KzIA7wbXx9LBIcJ +1wQvOPf99s4nOrnQev9xH5PZ820qS9xUjrlyE2bGYAhz5Cmq56ENs7THErIdZHtQ +uYEBprO+VFZjP50vtmCOL2PDl/xgv6J9r1Mp3KnR/m0esR+YceDW1qX07IkB5s+Q +us80v5LmmxnWcikWmR7dt1kOyV/+M6Y6mwvfQ4x3D/QUpO7SfMCOG5DGA7hVUHU/ +Tuh8MihmMFFOLAEEQI+1wkxr1W09HaYCcB+EZqxLSaBwMeFoYPJie9dBNBgps39o +6pDbjsO+or4JNuyoHvh8NNI5iY0IR2NMlW4mqCcHEazys2koxFTYK6YD95Vz0RkA +K4BErCDk3lVR0PH4StmLU3gmPayIjvi9Dl9saPRyu4Xx2WVi+q6spl3ckn4c4f3+ +iD8hxVp74+wa5ew0fIXjIpMoHCar/nndsse4i8glCddINdiOPPmhI9Wi3nT+5Z2t +9omPP2dEh0CzR+j1zvUpT3KtmhVICqhO+QP9BTJOwrp81NTlq9mbUyzTtVk/9dy3 +zoYbhKvY08k6LJ9FsQYySqtfJZ4cwl5WsOhALWwOwlMLA9wkz0eemgFxStyOylzl +QKoIK7zHuU6XYOXa32KSPIWaLy+WgIG/u2ObWtdE3CXVIUuSt5BQFnv7XVNHJllD +Az9VDEkOSYOiSEFVoUsAEQEAAQAP/1AagnZQZyzHDEgw4QELAspYHCWLXE5aZInX +wTUJhK31IgIXNn9bJ0hFiSpQR2xeMs9oYtRuPOu0P8oOFMn4/z374fkjZy8QVY3e +PlL+3EUeqYtkMwlGNmVw5a/NbNuNfm5Darb7pEfbYd1gPcni4MAYw7R2SG/57GbC +9gucvspHIfOSfBNLBthDzmK8xEKe1yD2eimfc2T7IRYb6hmkYfeds5GsqvGI6mwI +85h4uUHWRc5JOlhVM6yX8hSWx0L60Z3DZLChmc8maWnFXd7C8eQ6P1azJJbW71Ih +7CoK0XW4LE82vlQurSRFgTwfl7wFYszW2bOzCuhHDDtYnwH86Nsu0DC78ZVRnvxn +E8Ke/AJgrdhIOo4UAyR+aZD2+2mKd7/waOUTUrUtTzc7i8N3YXGi/EIaNReBXaq+ +ZNOp24BlFzRp+FCF/pptDW9HjPdiV09x0DgICmeZS4Gq/4vFFIahWctg52NGebT0 +Idxngjj+xDtLaZlLQoOz0n5ByjO/Wi0ANmMv1sMKCHhGvdaSws2/PbMR2r4caj8m +KXpIgdinM/wUzHJ5pZyF2U/qejsRj8Kw8KH/tfX4JCLhiaP/mgeTuWGDHeZQERAT +xPmRFHaLP9/ZhvGNh6okIYtrKjWTLGoXvKLHcrKNisBLSq+P2WeFrlme1vjvJMo/ +jPwLT5o9CADQmcbKZ+QQ1ZM9v99iDZol7SAMZX43JC019sx6GK0u6xouJBcLfeB4 +OXacTgmSYdTa9RM9fbfVpti01tJ84LV2SyL/VJq/enJF4XQPSynT/tFTn1PAor6o +tEAAd8fjKdJ6LnD5wb92SPHfQfXqI84rFEO8rUNIE/1ErT6DYifDzVCbfD2KZdoF +cOSp7TpD77sY1bs74ocBX5ejKtd+aH99D78bJSMM4pSDZsIEwnomkBHTziubPwJb +OwnATy0LmSMAWOw5rKbsh5nfwCiUTM20xp0t5JeXd+wPVWbpWqI2EnkCEN+RJr9i +7dp/ymDQ+Yt5wrsN3NwoyiexPOG91WQVCADdErHsnglVZZq9Z8Wx7KwecGCUurJ2 +H6lKudv5YOxPnAzqZS5HbpZd/nRTMZh2rdXCr5m2YOuewyYjvM757AkmUpM09zJX +MQ1S67/UX2y8/74TcRF97Ncx9HeELs92innBRXoFitnNguvcO6Esx4BTe1OdU6qR +ER3zAmVf22Le9ciXbu24DN4mleOH+OmBx7X2PqJSYW9GAMTsRB081R6EWKH7romQ +waxFrZ4DJzZ9ltyosEJn5F32StyLrFxpcrdLUoEaclZCv2qka7sZvi0EvovDVEBU +e10jOx9AOwf8Gj2ufhquQ6qgVYCzbP+YrodtkFrXRS3IsljIchj1M2ffB/0bfoUs +rtER9pLvYzCjBPg8IfGLw0o754Qbhh/ReplCRTusP/fQMybvCvfxreS3oyEriu/G +GufRomjewZ8EMHDIgUsLcYo2UHZsfF7tcazgxMGmMvazp4r8vpgrvW/8fIN/6Adu +tF+WjWDTvJLFJCe6O+BFJOWrssNrrra1zGtLC1s8s+Wfpe+bGPL5zpHeebGTwH1U +22eqgJArlEKxrfarz7W5+uHZJHSjF/K9ZvunLGD0n9GOPMpji3UO3zeM8IYoWn7E +/EWK1XbjnssNemeeTZ+sDh+qrD7BOi+vCX1IyBxbfqnQfJZvmcPWpruy1UsO+aIC +0GY8Jr3OL69dDQ21jueJAh8EGAEIAAkFAlC9+dkCGwwACgkQL0VeKCTRjd9HCw/+ +LQSVgLLF4ulYlPCjWIIuQwrPbJfWUVVr2dPUFVM85DCv8gBzk5c121snXh9Swovm +laBbw6ate3BmbXLh64jVE9Za5sbTWi7PCcbO/bpRy4d6oLmitmNw6cq0vjTLxUYy +bwuiJxWREkfxuU85EKdouN062YDevH+/YResmlJrcCE7LRlJFeRlKsrrwBU3BqYd +GgFJjKjQC1peeQ9fj62Y7xfwE9+PXbkiWO5u/Bk8hb1VZH1SoIRU98NHVcp6BVvp +VK0jLAXuSauSczULmpRjbyt1lhaAqivDTWEEZXiNNbRyp17c3nVdPWOcgBr42hdQ +z25CgZgyLCsvu82wuXLKJblrIPJX3Yf+si6KqEWBsmwdOWybsjygaF5HvzgFqAAD +U0goPWoQ71PorP2XOUNp5ZLkBQp5etvtkksjVNMIhnHn8PGMuoxO39EUGlWj2B5l +Cu8tSosAzB1pS8NcLZzoNoI9dOHrmgJmP+GrOUkcf5GhNZbMoj4GNfGBRYX0SZlQ +GuDrwNKYj73C4MWyNnnUFyq8nDHJ/G1NpaF2hiof9RBL4PUU/f92JkceXPBXA8gL +Mz2ig1OButwPPLFGQhWqxXAGrsS3Ny+BhTJfnfIbbkaLLphBpDZm1D9XKbAUvdd1 +RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc= +=JTFu +-----END PGP PRIVATE KEY BLOCK----- +""" + +# key 7FEE575A: public key "anotheruser <anotheruser@leap.se>" +PUBLIC_KEY_2 = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +mI0EUYwJXgEEAMbTKHuPJ5/Gk34l9Z06f+0WCXTDXdte1UBoDtZ1erAbudgC4MOR +gquKqoj3Hhw0/ILqJ88GcOJmKK/bEoIAuKaqlzDF7UAYpOsPZZYmtRfPC2pTCnXq +Z1vdeqLwTbUspqXflkCkFtfhGKMq5rH8GV5a3tXZkRWZhdNwhVXZagC3ABEBAAG0 +IWFub3RoZXJ1c2VyIDxhbm90aGVydXNlckBsZWFwLnNlPoi4BBMBAgAiBQJRjAle +AhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRB/nfpof+5XWotuA/4tLN4E +gUr7IfLy2HkHAxzw7A4rqfMN92DIM9mZrDGaWRrOn3aVF7VU1UG7MDkHfPvp/cFw +ezoCw4s4IoHVc/pVlOkcHSyt4/Rfh248tYEJmFCJXGHpkK83VIKYJAithNccJ6Q4 +JE/o06Mtf4uh/cA1HUL4a4ceqUhtpLJULLeKo7iNBFGMCV4BBADsyQI7GR0wSAxz +VayLjuPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQt +Z/hwcLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63 +yuRe94WenT1RJd6xU1aaUff4rKizuQARAQABiJ8EGAECAAkFAlGMCV4CGwwACgkQ +f536aH/uV1rPZQQAqCzRysOlu8ez7PuiBD4SebgRqWlxa1TF1ujzfLmuPivROZ2X +Kw5aQstxgGSjoB7tac49s0huh4X8XK+BtJBfU84JS8Jc2satlfwoyZ35LH6sDZck +I+RS/3we6zpMfHs3vvp9xgca6ZupQxivGtxlJs294TpJorx+mFFqbV17AzQ= +=Thdu +-----END PGP PUBLIC KEY BLOCK----- +""" + +PRIVATE_KEY_2 = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.10 (GNU/Linux) + +lQHYBFGMCV4BBADG0yh7jyefxpN+JfWdOn/tFgl0w13bXtVAaA7WdXqwG7nYAuDD +kYKriqqI9x4cNPyC6ifPBnDiZiiv2xKCALimqpcwxe1AGKTrD2WWJrUXzwtqUwp1 +6mdb3Xqi8E21LKal35ZApBbX4RijKuax/BleWt7V2ZEVmYXTcIVV2WoAtwARAQAB +AAP7BLuSAx7tOohnimEs74ks8l/L6dOcsFQZj2bqs4AoY3jFe7bV0tHr4llypb/8 +H3/DYvpf6DWnCjyUS1tTnXSW8JXtx01BUKaAufSmMNg9blKV6GGHlT/Whe9uVyks +7XHk/+9mebVMNJ/kNlqq2k+uWqJohzC8WWLRK+d1tBeqDsECANZmzltPaqUsGV5X +C3zszE3tUBgptV/mKnBtopKi+VH+t7K6fudGcG+bAcZDUoH/QVde52mIIjjIdLje +uajJuHUCAO1mqh+vPoGv4eBLV7iBo3XrunyGXiys4a39eomhxTy3YktQanjjx+ty +GltAGCs5PbWGO6/IRjjvd46wh53kzvsCAO0J97gsWhzLuFnkxFAJSPk7RRlyl7lI +1XS/x0Og6j9XHCyY1OYkfBm0to3UlCfkgirzCYlTYObCofzdKFIPDmSqHbQhYW5v +dGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iLgEEwECACIFAlGMCV4CGwMG +CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEH+d+mh/7ldai24D/i0s3gSBSvsh +8vLYeQcDHPDsDiup8w33YMgz2ZmsMZpZGs6fdpUXtVTVQbswOQd8++n9wXB7OgLD +izgigdVz+lWU6RwdLK3j9F+Hbjy1gQmYUIlcYemQrzdUgpgkCK2E1xwnpDgkT+jT +oy1/i6H9wDUdQvhrhx6pSG2kslQst4qjnQHYBFGMCV4BBADsyQI7GR0wSAxzVayL +juPzgT+bjbFeymIhjuxKIEwnIKwYkovztW+4bbOcQs785k3Lp6RzvigTpQQtZ/hw +cLOqZbZw8t/24+D+Pq9mMP2uUvCFFqLlVvA6D3vKSQ/XNN+YB919WQ04jh63yuRe +94WenT1RJd6xU1aaUff4rKizuQARAQABAAP9EyElqJ3dq3EErXwwT4mMnbd1SrVC +rUJrNWQZL59mm5oigS00uIyR0SvusOr+UzTtd8ysRuwHy5d/LAZsbjQStaOMBILx +77TJveOel0a1QK0YSMF2ywZMCKvquvjli4hAtWYz/EwfuzQN3t23jc5ny+GqmqD2 +3FUxLJosFUfLNmECAO9KhVmJi+L9dswIs+2Dkjd1eiRQzNOEVffvYkGYZyKxNiXF +UA5kvyZcB4iAN9sWCybE4WHZ9jd4myGB0MPDGxkCAP1RsXJbbuD6zS7BXe5gwunO +2q4q7ptdSl/sJYQuTe1KNP5d/uGsvlcFfsYjpsopasPjFBIncc/2QThMKlhoEaEB +/0mVAxpT6SrEvUbJ18z7kna24SgMPr3OnPMxPGfvNLJY/Xv/A17YfoqjmByCvsKE +JCDjopXtmbcrZyoEZbEht9mko4ifBBgBAgAJBQJRjAleAhsMAAoJEH+d+mh/7lda +z2UEAKgs0crDpbvHs+z7ogQ+Enm4EalpcWtUxdbo83y5rj4r0TmdlysOWkLLcYBk +o6Ae7WnOPbNIboeF/FyvgbSQX1POCUvCXNrGrZX8KMmd+Sx+rA2XJCPkUv98Hus6 +THx7N776fcYHGumbqUMYrxrcZSbNveE6SaK8fphRam1dewM0 +=a5gs +-----END PGP PRIVATE KEY BLOCK----- +""" diff --git a/tests/keymanager/fixtures/private_key.bin b/tests/keymanager/fixtures/private_key.bin Binary files differnew file mode 100644 index 0000000..ab17431 --- /dev/null +++ b/tests/keymanager/fixtures/private_key.bin diff --git a/tests/keymanager/fixtures/public_key.bin b/tests/keymanager/fixtures/public_key.bin Binary files differnew file mode 100644 index 0000000..ab17431 --- /dev/null +++ b/tests/keymanager/fixtures/public_key.bin diff --git a/tests/keymanager/test_keymanager.py b/tests/keymanager/test_keymanager.py new file mode 100644 index 0000000..b4ab805 --- /dev/null +++ b/tests/keymanager/test_keymanager.py @@ -0,0 +1,609 @@ +# -*- coding: utf-8 -*- +# test_keymanager.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for the Key Manager. +""" + +from os import path +import json +import urllib +from datetime import datetime +import tempfile +import pkg_resources +from leap.common import ca_bundle +from mock import Mock, MagicMock, patch +from twisted.internet import defer +from twisted.trial import unittest +from twisted.web._responses import NOT_FOUND + +from leap.keymanager import client + +from leap.keymanager import errors +from leap.keymanager.keys import ( + OpenPGPKey, + is_address, + build_key_from_dict, +) +from leap.keymanager.validation import ValidationLevels + +from common import ( + KeyManagerWithSoledadTestCase, + ADDRESS, + ADDRESS_2, + KEY_FINGERPRINT, + PUBLIC_KEY, + PUBLIC_KEY_2, + PRIVATE_KEY, + PRIVATE_KEY_2, +) + + +NICKSERVER_URI = "http://leap.se/" +REMOTE_KEY_URL = "http://site.domain/key" +INVALID_MAIL_ADDRESS = "notexistingemail@example.org" + + +class KeyManagerUtilTestCase(unittest.TestCase): + + def test_is_address(self): + self.assertTrue( + is_address('user@leap.se'), + 'Incorrect address detection.') + self.assertFalse( + is_address('userleap.se'), + 'Incorrect address detection.') + self.assertFalse( + is_address('user@'), + 'Incorrect address detection.') + self.assertFalse( + is_address('@leap.se'), + 'Incorrect address detection.') + + def test_build_key_from_dict(self): + kdict = { + 'uids': [ADDRESS], + 'fingerprint': KEY_FINGERPRINT, + 'key_data': PUBLIC_KEY, + 'private': False, + 'length': 4096, + 'expiry_date': 0, + 'refreshed_at': 1311239602, + } + adict = { + 'address': ADDRESS, + 'private': False, + 'last_audited_at': 0, + 'validation': str(ValidationLevels.Weak_Chain), + 'encr_used': False, + 'sign_used': True, + } + key = build_key_from_dict(kdict, adict) + self.assertEqual( + kdict['uids'], key.uids, + 'Wrong data in key.') + self.assertEqual( + kdict['fingerprint'], key.fingerprint, + 'Wrong data in key.') + self.assertEqual( + kdict['key_data'], key.key_data, + 'Wrong data in key.') + self.assertEqual( + kdict['private'], key.private, + 'Wrong data in key.') + self.assertEqual( + kdict['length'], key.length, + 'Wrong data in key.') + self.assertEqual( + None, key.expiry_date, + 'Wrong data in key.') + self.assertEqual( + None, key.last_audited_at, + 'Wrong data in key.') + self.assertEqual( + datetime.fromtimestamp(kdict['refreshed_at']), key.refreshed_at, + 'Wrong data in key.') + self.assertEqual( + adict['address'], key.address, + 'Wrong data in key.') + self.assertEqual( + ValidationLevels.get(adict['validation']), key.validation, + 'Wrong data in key.') + self.assertEqual( + adict['encr_used'], key.encr_used, + 'Wrong data in key.') + self.assertEqual( + adict['sign_used'], key.sign_used, + 'Wrong data in key.') + + +class KeyManagerKeyManagementTestCase(KeyManagerWithSoledadTestCase): + + @defer.inlineCallbacks + def _test_gen_key(self): + km = self._key_manager() + key = yield km.gen_key() + self.assertIsInstance(key, OpenPGPKey) + self.assertEqual( + 'leap@leap.se', key.address, 'Wrong address bound to key.') + self.assertEqual( + 4096, key.length, 'Wrong key length.') + + @defer.inlineCallbacks + def test_get_all_keys_in_db(self): + km = self._key_manager() + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + # get public keys + keys = yield km.get_all_keys(False) + self.assertEqual(len(keys), 1, 'Wrong number of keys') + self.assertTrue(ADDRESS in keys[0].uids) + self.assertFalse(keys[0].private) + # get private keys + keys = yield km.get_all_keys(True) + self.assertEqual(len(keys), 1, 'Wrong number of keys') + self.assertTrue(ADDRESS in keys[0].uids) + self.assertTrue(keys[0].private) + + @defer.inlineCallbacks + def test_get_public_key(self): + km = self._key_manager() + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + # get the key + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) + self.assertTrue(key is not None) + self.assertTrue(ADDRESS in key.uids) + self.assertEqual( + key.fingerprint.lower(), KEY_FINGERPRINT.lower()) + self.assertFalse(key.private) + + @defer.inlineCallbacks + def test_get_public_key_with_binary_private_key(self): + km = self._key_manager() + yield km._openpgp.put_raw_key(self.get_private_binary_key(), ADDRESS) + # get the key + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) + self.assertTrue(key is not None) + self.assertTrue(ADDRESS in key.uids) + self.assertEqual( + key.fingerprint.lower(), KEY_FINGERPRINT.lower()) + self.assertFalse(key.private) + + @defer.inlineCallbacks + def test_get_private_key(self): + km = self._key_manager() + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + # get the key + key = yield km.get_key(ADDRESS, private=True, fetch_remote=False) + self.assertTrue(key is not None) + self.assertTrue(ADDRESS in key.uids) + self.assertEqual( + key.fingerprint.lower(), KEY_FINGERPRINT.lower()) + self.assertTrue(key.private) + + def test_send_key_raises_key_not_found(self): + km = self._key_manager() + d = km.send_key() + return self.assertFailure(d, errors.KeyNotFound) + + @defer.inlineCallbacks + def test_send_key(self): + """ + Test that request is well formed when sending keys to server. + """ + token = "mytoken" + km = self._key_manager(token=token) + yield km._openpgp.put_raw_key(PUBLIC_KEY, ADDRESS) + km._async_client_pinned.request = Mock(return_value=defer.succeed('')) + # the following data will be used on the send + km.ca_cert_path = 'capath' + km.session_id = 'sessionid' + km.uid = 'myuid' + km.api_uri = 'apiuri' + km.api_version = 'apiver' + yield km.send_key() + # setup expected args + pubkey = yield km.get_key(km._address) + data = urllib.urlencode({ + km.PUBKEY_KEY: pubkey.key_data, + }) + headers = {'Authorization': [str('Token token=%s' % token)]} + headers['Content-Type'] = ['application/x-www-form-urlencoded'] + url = '%s/%s/users/%s.json' % ('apiuri', 'apiver', 'myuid') + km._async_client_pinned.request.assert_called_once_with( + str(url), 'PUT', body=str(data), + headers=headers + ) + + def test_fetch_keys_from_server(self): + """ + Test that the request is well formed when fetching keys from server. + """ + km = self._key_manager(url=NICKSERVER_URI) + expected_url = NICKSERVER_URI + '?address=' + ADDRESS_2 + + def verify_the_call(_): + used_kwargs = km._async_client_pinned.request.call_args[1] + km._async_client_pinned.request.assert_called_once_with( + expected_url, 'GET', **used_kwargs) + + d = self._fetch_key(km, ADDRESS_2, PUBLIC_KEY_2) + d.addCallback(verify_the_call) + return d + + def test_key_not_found_is_raised_if_key_search_responds_404(self): + """ + Test if key search request comes back with a 404 response then + KeyNotFound is raised, with corresponding error message. + """ + km = self._key_manager(url=NICKSERVER_URI) + client.readBody = Mock(return_value=defer.succeed(None)) + km._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) + url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS + + d = km._fetch_and_handle_404_from_nicknym(url, INVALID_MAIL_ADDRESS) + + def check_key_not_found_is_raised_if_404(_): + used_kwargs = km._async_client_pinned.request.call_args[1] + check_404_callback = used_kwargs['callback'] + fake_response = Mock() + fake_response.code = NOT_FOUND + with self.assertRaisesRegexp( + errors.KeyNotFound, + '404: %s key not found.' % INVALID_MAIL_ADDRESS): + check_404_callback(fake_response) + + d.addCallback(check_key_not_found_is_raised_if_404) + return d + + def test_non_existing_key_from_nicknym_is_relayed(self): + """ + Test if key search requests throws KeyNotFound, the same error is + raised. + """ + km = self._key_manager(url=NICKSERVER_URI) + key_not_found_exception = errors.KeyNotFound('some message') + km._async_client_pinned.request = Mock( + side_effect=key_not_found_exception) + + def assert_key_not_found_raised(error): + self.assertEqual(error.value, key_not_found_exception) + + d = km._get_key_from_nicknym(INVALID_MAIL_ADDRESS) + d.addErrback(assert_key_not_found_raised) + + @defer.inlineCallbacks + def test_get_key_fetches_from_server(self): + """ + Test that getting a key successfuly fetches from server. + """ + km = self._key_manager(url=NICKSERVER_URI) + + key = yield self._fetch_key(km, ADDRESS, PUBLIC_KEY) + self.assertIsInstance(key, OpenPGPKey) + self.assertTrue(ADDRESS in key.uids) + self.assertEqual(key.validation, ValidationLevels.Provider_Trust) + + @defer.inlineCallbacks + def test_get_key_fetches_other_domain(self): + """ + Test that getting a key successfuly fetches from server. + """ + km = self._key_manager(url=NICKSERVER_URI) + + key = yield self._fetch_key(km, ADDRESS_OTHER, PUBLIC_KEY_OTHER) + self.assertIsInstance(key, OpenPGPKey) + self.assertTrue(ADDRESS_OTHER in key.uids) + self.assertEqual(key.validation, ValidationLevels.Weak_Chain) + + def _fetch_key(self, km, address, key): + """ + :returns: a Deferred that will fire with the OpenPGPKey + """ + data = json.dumps({'address': address, 'openpgp': key}) + + client.readBody = Mock(return_value=defer.succeed(data)) + + # mock the fetcher so it returns the key for ADDRESS_2 + km._async_client_pinned.request = Mock( + return_value=defer.succeed(None)) + km.ca_cert_path = 'cacertpath' + # try to key get without fetching from server + d_fail = km.get_key(address, fetch_remote=False) + d = self.assertFailure(d_fail, errors.KeyNotFound) + # try to get key fetching from server. + d.addCallback(lambda _: km.get_key(address)) + return d + + @defer.inlineCallbacks + def test_put_key_ascii(self): + """ + Test that putting ascii key works + """ + km = self._key_manager(url=NICKSERVER_URI) + + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield km.get_key(ADDRESS) + self.assertIsInstance(key, OpenPGPKey) + self.assertTrue(ADDRESS in key.uids) + + @defer.inlineCallbacks + def test_put_key_binary(self): + """ + Test that putting binary key works + """ + km = self._key_manager(url=NICKSERVER_URI) + + yield km.put_raw_key(self.get_public_binary_key(), ADDRESS) + key = yield km.get_key(ADDRESS) + + self.assertIsInstance(key, OpenPGPKey) + self.assertTrue(ADDRESS in key.uids) + + @defer.inlineCallbacks + def test_fetch_uri_ascii_key(self): + """ + Test that fetch key downloads the ascii key and gets included in + the local storage + """ + km = self._key_manager() + + km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) + + yield km.fetch_key(ADDRESS, "http://site.domain/key") + key = yield km.get_key(ADDRESS) + self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + + @defer.inlineCallbacks + def test_fetch_uri_binary_key(self): + """ + Test that fetch key downloads the binary key and gets included in + the local storage + """ + km = self._key_manager() + + km._async_client.request = Mock( + return_value=defer.succeed(self.get_public_binary_key())) + + yield km.fetch_key(ADDRESS, "http://site.domain/key") + key = yield km.get_key(ADDRESS) + self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + + def test_fetch_uri_empty_key(self): + """ + Test that fetch key raises KeyNotFound if no key in the url + """ + km = self._key_manager() + + km._async_client.request = Mock(return_value=defer.succeed("")) + d = km.fetch_key(ADDRESS, "http://site.domain/key") + return self.assertFailure(d, errors.KeyNotFound) + + def test_fetch_uri_address_differ(self): + """ + Test that fetch key raises KeyAttributesDiffer if the address + don't match + """ + km = self._key_manager() + + km._async_client.request = Mock(return_value=defer.succeed(PUBLIC_KEY)) + d = km.fetch_key(ADDRESS_2, "http://site.domain/key") + return self.assertFailure(d, errors.KeyAddressMismatch) + + def _mock_get_response(self, km, body): + km._async_client.request = MagicMock(return_value=defer.succeed(body)) + + return km._async_client.request + + @defer.inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_none_specified(self): + ca_cert_path = None + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') + + @defer.inlineCallbacks + def test_fetch_key_uses_ca_bundle_if_empty_string_specified(self): + ca_cert_path = '' + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') + + @defer.inlineCallbacks + def test_fetch_key_use_default_ca_bundle_if_set_as_ca_cert_path(self): + ca_cert_path = ca_bundle.where() + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) + + get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') + + @defer.inlineCallbacks + def test_fetch_uses_combined_ca_bundle_otherwise(self): + with tempfile.NamedTemporaryFile() as tmp_input, \ + tempfile.NamedTemporaryFile(delete=False) as tmp_output: + ca_content = pkg_resources.resource_string('leap.common.testing', + 'cacert.pem') + ca_cert_path = tmp_input.name + self._dump_to_file(ca_cert_path, ca_content) + + with patch('leap.keymanager.tempfile.NamedTemporaryFile') as mock: + mock.return_value = tmp_output + km = self._key_manager(ca_cert_path=ca_cert_path) + get_mock = self._mock_get_response(km, PUBLIC_KEY_OTHER) + + yield km.fetch_key(ADDRESS_OTHER, REMOTE_KEY_URL) + + # assert that combined bundle file is passed to get call + get_mock.assert_called_once_with(REMOTE_KEY_URL, 'GET') + + # assert that files got appended + expected = self._slurp_file(ca_bundle.where()) + ca_content + self.assertEqual(expected, self._slurp_file(tmp_output.name)) + + del km # force km out of scope + self.assertFalse(path.exists(tmp_output.name)) + + def _dump_to_file(self, filename, content): + with open(filename, 'w') as out: + out.write(content) + + def _slurp_file(self, filename): + with open(filename) as f: + content = f.read() + return content + + @defer.inlineCallbacks + def test_decrypt_updates_sign_used_for_signer(self): + # given + km = self._key_manager() + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, + fetch_remote=False) + yield km.decrypt( + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) + + # when + key = yield km.get_key(ADDRESS_2, fetch_remote=False) + + # then + self.assertEqual(True, key.sign_used) + + @defer.inlineCallbacks + def test_decrypt_does_not_update_sign_used_for_recipient(self): + # given + km = self._key_manager() + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + encdata = yield km.encrypt('data', ADDRESS, sign=ADDRESS_2, + fetch_remote=False) + yield km.decrypt( + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) + + # when + key = yield km.get_key( + ADDRESS, private=False, fetch_remote=False) + + # then + self.assertEqual(False, key.sign_used) + + +class KeyManagerCryptoTestCase(KeyManagerWithSoledadTestCase): + + RAW_DATA = 'data' + + @defer.inlineCallbacks + def test_keymanager_openpgp_encrypt_decrypt(self): + km = self._key_manager() + # put raw private key + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + # encrypt + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, + fetch_remote=False) + self.assertNotEqual(self.RAW_DATA, encdata) + # decrypt + rawdata, signingkey = yield km.decrypt( + encdata, ADDRESS, verify=ADDRESS_2, fetch_remote=False) + self.assertEqual(self.RAW_DATA, rawdata) + key = yield km.get_key(ADDRESS_2, private=False, fetch_remote=False) + self.assertEqual(signingkey.fingerprint, key.fingerprint) + + @defer.inlineCallbacks + def test_keymanager_openpgp_encrypt_decrypt_wrong_sign(self): + km = self._key_manager() + # put raw keys + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + yield km._openpgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + # encrypt + encdata = yield km.encrypt(self.RAW_DATA, ADDRESS, sign=ADDRESS_2, + fetch_remote=False) + self.assertNotEqual(self.RAW_DATA, encdata) + # verify + rawdata, signingkey = yield km.decrypt( + encdata, ADDRESS, verify=ADDRESS, fetch_remote=False) + self.assertEqual(self.RAW_DATA, rawdata) + self.assertTrue(isinstance(signingkey, errors.InvalidSignature)) + + @defer.inlineCallbacks + def test_keymanager_openpgp_sign_verify(self): + km = self._key_manager() + # put raw private keys + yield km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + signdata = yield km.sign(self.RAW_DATA, ADDRESS, detach=False) + self.assertNotEqual(self.RAW_DATA, signdata) + # verify + signingkey = yield km.verify(signdata, ADDRESS, fetch_remote=False) + key = yield km.get_key(ADDRESS, private=False, fetch_remote=False) + self.assertEqual(signingkey.fingerprint, key.fingerprint) + + def test_keymanager_encrypt_key_not_found(self): + km = self._key_manager() + d = km._openpgp.put_raw_key(PRIVATE_KEY, ADDRESS) + d.addCallback( + lambda _: km.encrypt(self.RAW_DATA, ADDRESS_2, sign=ADDRESS, + fetch_remote=False)) + return self.assertFailure(d, errors.KeyNotFound) + +if __name__ == "__main__": + import unittest + unittest.main() + +# key 0F91B402: someone@somedomain.org +# 9420 EC7B 6DCB 867F 5592 E6D1 7504 C974 0F91 B402 +ADDRESS_OTHER = "someone@somedomain.org" +PUBLIC_KEY_OTHER = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mQENBFUZFLwBCADRzTstykRAV3aWysLAV4O3DXdpXhV3Cww8Pfc6m1bVxAT2ifcL +kLWEaIkOB48SYIHbYzqOi1/h5abJf+5n4uhaIks+FsjsXYo1XOiYpVCNf7+xLnUM +jkmglKT5sASr61QDcFMqWfGTJ8iUTNVCJZ2k14QJ4Vss/ntnV9uB7Ef7wU7RZvxr +wINH/0LfKPsGE9l2qNpKUAAmg2bHn9YdsHj1sqlW7eZpwvefYrQej4KBaL2oq3vt +QQOdXGFqWYMe3cX+bQ1DAMG3ttTF6EGkY97BK7A18I/RJiLujWCEAkMzFr5SK9KU +AOMj6MpjfTOE+GfUKsu7/gGt42eMBFsIOvsZABEBAAG0IFNvbWVvbmUgPHNvbWVv +bmVAc29tZWRvbWFpbi5vcmc+iQE4BBMBAgAiBQJVGRS8AhsDBgsJCAcDAgYVCAIJ +CgsEFgIDAQIeAQIXgAAKCRB1BMl0D5G0AlFsCAC33LhxBRwO64T6DgTb4/39aLpi +9T3yAmXBAHC7Q+4f37IBX5fJBRKu4Lvfp6KherOl/I/Jj34yv8pm0j+kXeWktfxZ +cW+mv2vjBHQVopiUSyMVh7caFSq9sKm+oQdo6oIl9DHSARegbkCn2+0b4VxgJpyj +TZBMyUMD2AayivQU4QHOM3KCozhLNNDbpKy7LH0MSAUDmRaJsPk1zK15lQocK/7R +Z5yF4rdrdzDWrVucZJc09yntSqTGECue3W2GBCaBlb/O1c9xei4MTb4nSHS5Gp/7 +hcjrvIrgPpehndk8ZRREN/Y8uk1W5fbWzx+5z8g31RCGWBQw4NAnG10NZ3oEuQEN +BFUZFLwBCADocYZmLu1iXIE6gKqniR6Z8UDC5XnqgK+BEJwi1abe9zWhjgKeW9Vv +u1i194wuCUiNkP/bMvwMBZLTslDzqxl32ETk9FvB3kWy80S8MDjQJ15IN4I622fq +MEWwtQ0WrRay9VV6M8H2mIf71/1d5T9ysWK4XRyv+N7eRhfg7T2uhrpNyKdCZzjq +2wlgpVkMY7gtxTqJseM+qS5UNiReGxtoOXFLzzmagFgbqK88eMeZJZt8yKf81xhP +SWLTxaVaeBEAlajvEkxZJrrDQuc+maTwtMxmNUe815wJnpcRF8VD91GUpSLAN6EC +1QuJUl6Lc2o2tcHeo6CGsDZ96o0J8pFhABEBAAGJAR8EGAECAAkFAlUZFLwCGwwA +CgkQdQTJdA+RtAKcdwgApzHPhwwaZ9TBjgOytke/hPE0ht/EJ5nRiIda2PucoPh6 +DwnaI8nvmGXUfC4qFy6LM8/fJHof1BqLnMbx8MCLurnm5z30q8RhLE3YWM11zuMy +6wkHGmi/6S1G4okC+Uu8AA4K//HBo8bLcqGVWRnFAmCqy6VMAofsQvmM7vHbRj56 +U919Bki/7I6kcxPEzO73Umh3o82VP/Hz3JMigRNBRfG3jPrX04RLJj3Ib5lhQIDw +XrO8VHz9foOpY+rJnWj+6QAozxorzZYShu6H0GR1nIuqWMwli1nrx6BeIJAVz5cg +QzEd9yAN+81fkIBaa6Y8LCBxV03JCc2J4eCUKXd1gg== +=gDzy +-----END PGP PUBLIC KEY BLOCK----- +""" diff --git a/tests/keymanager/test_migrator.py b/tests/keymanager/test_migrator.py new file mode 100644 index 0000000..64cd8e1 --- /dev/null +++ b/tests/keymanager/test_migrator.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# test_migrator.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for the migrator. +""" + + +from collections import namedtuple +from mock import Mock +from twisted.internet.defer import succeed, inlineCallbacks + +from leap.keymanager.migrator import KeyDocumentsMigrator, KEY_ID_KEY +from leap.keymanager.documents import ( + TAGS_PRIVATE_INDEX, + KEYMANAGER_ACTIVE_TAG, + KEYMANAGER_KEY_TAG, + KEYMANAGER_DOC_VERSION, + + KEY_ADDRESS_KEY, + KEY_UIDS_KEY, + KEY_VERSION_KEY, + KEY_FINGERPRINT_KEY, + KEY_VALIDATION_KEY, + KEY_LAST_AUDITED_AT_KEY, + KEY_ENCR_USED_KEY, + KEY_SIGN_USED_KEY, +) +from leap.keymanager.validation import ValidationLevels + +from common import ( + KeyManagerWithSoledadTestCase, + ADDRESS, + ADDRESS_2, + KEY_FINGERPRINT, +) + + +class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): + @inlineCallbacks + def test_simple_migration(self): + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + put_doc = self._soledad.put_doc + + def my_get_from_index(*args): + docs = [] + if (args[0] == TAGS_PRIVATE_INDEX and + args[2] == '0'): + SoledadDocument = namedtuple("SoledadDocument", ["content"]) + if args[1] == KEYMANAGER_KEY_TAG: + docs = [SoledadDocument({ + KEY_ADDRESS_KEY: [ADDRESS], + KEY_ID_KEY: KEY_FINGERPRINT[-16:], + KEY_FINGERPRINT_KEY: KEY_FINGERPRINT, + KEY_VALIDATION_KEY: str(ValidationLevels.Weak_Chain), + KEY_LAST_AUDITED_AT_KEY: 0, + KEY_ENCR_USED_KEY: True, + KEY_SIGN_USED_KEY: False, + })] + if args[1] == KEYMANAGER_ACTIVE_TAG: + docs = [SoledadDocument({ + KEY_ID_KEY: KEY_FINGERPRINT[-16:], + })] + return succeed(docs) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + self._soledad.put_doc = Mock(return_value=succeed(None)) + + try: + migrator = KeyDocumentsMigrator(self._soledad) + yield migrator.migrate() + call_list = self._soledad.put_doc.call_args_list + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + self._soledad.put_doc = put_doc + + self.assertEqual(len(call_list), 2) + active = call_list[0][0][0] + key = call_list[1][0][0] + + self.assertTrue(KEY_ID_KEY not in active.content) + self.assertEqual(active.content[KEY_VERSION_KEY], + KEYMANAGER_DOC_VERSION) + self.assertEqual(active.content[KEY_FINGERPRINT_KEY], KEY_FINGERPRINT) + self.assertEqual(active.content[KEY_VALIDATION_KEY], + str(ValidationLevels.Weak_Chain)) + self.assertEqual(active.content[KEY_LAST_AUDITED_AT_KEY], 0) + self.assertEqual(active.content[KEY_ENCR_USED_KEY], True) + self.assertEqual(active.content[KEY_SIGN_USED_KEY], False) + + self.assertTrue(KEY_ID_KEY not in key.content) + self.assertTrue(KEY_ADDRESS_KEY not in key.content) + self.assertTrue(KEY_VALIDATION_KEY not in key.content) + self.assertTrue(KEY_LAST_AUDITED_AT_KEY not in key.content) + self.assertTrue(KEY_ENCR_USED_KEY not in key.content) + self.assertTrue(KEY_SIGN_USED_KEY not in key.content) + self.assertEqual(key.content[KEY_UIDS_KEY], [ADDRESS]) + + @inlineCallbacks + def test_two_active_docs(self): + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + put_doc = self._soledad.put_doc + + def my_get_from_index(*args): + docs = [] + if (args[0] == TAGS_PRIVATE_INDEX and + args[2] == '0'): + SoledadDocument = namedtuple("SoledadDocument", ["content"]) + if args[1] == KEYMANAGER_KEY_TAG: + validation = str(ValidationLevels.Provider_Trust) + docs = [SoledadDocument({ + KEY_ADDRESS_KEY: [ADDRESS, ADDRESS_2], + KEY_ID_KEY: KEY_FINGERPRINT[-16:], + KEY_FINGERPRINT_KEY: KEY_FINGERPRINT, + KEY_VALIDATION_KEY: validation, + KEY_LAST_AUDITED_AT_KEY: 1984, + KEY_ENCR_USED_KEY: True, + KEY_SIGN_USED_KEY: False, + })] + if args[1] == KEYMANAGER_ACTIVE_TAG: + docs = [ + SoledadDocument({ + KEY_ADDRESS_KEY: ADDRESS, + KEY_ID_KEY: KEY_FINGERPRINT[-16:], + }), + SoledadDocument({ + KEY_ADDRESS_KEY: ADDRESS_2, + KEY_ID_KEY: KEY_FINGERPRINT[-16:], + }), + ] + return succeed(docs) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + self._soledad.put_doc = Mock(return_value=succeed(None)) + + try: + migrator = KeyDocumentsMigrator(self._soledad) + yield migrator.migrate() + call_list = self._soledad.put_doc.call_args_list + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + self._soledad.put_doc = put_doc + + self.assertEqual(len(call_list), 3) + for active in [call[0][0] for call in call_list][:2]: + self.assertTrue(KEY_ID_KEY not in active.content) + self.assertEqual(active.content[KEY_VERSION_KEY], + KEYMANAGER_DOC_VERSION) + self.assertEqual(active.content[KEY_FINGERPRINT_KEY], + KEY_FINGERPRINT) + self.assertEqual(active.content[KEY_VALIDATION_KEY], + str(ValidationLevels.Weak_Chain)) + self.assertEqual(active.content[KEY_LAST_AUDITED_AT_KEY], 0) + self.assertEqual(active.content[KEY_ENCR_USED_KEY], False) + self.assertEqual(active.content[KEY_SIGN_USED_KEY], False) diff --git a/tests/keymanager/test_openpgp.py b/tests/keymanager/test_openpgp.py new file mode 100644 index 0000000..1f78ad4 --- /dev/null +++ b/tests/keymanager/test_openpgp.py @@ -0,0 +1,365 @@ +# -*- coding: utf-8 -*- +# test_keymanager.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Tests for the OpenPGP support on Key Manager. +""" + + +from datetime import datetime +from mock import Mock +from twisted.internet.defer import inlineCallbacks, gatherResults, succeed + +from leap.keymanager import ( + KeyNotFound, + openpgp, +) +from leap.keymanager.documents import ( + TYPE_FINGERPRINT_PRIVATE_INDEX, + TYPE_ADDRESS_PRIVATE_INDEX, +) +from leap.keymanager.keys import OpenPGPKey + +from common import ( + KeyManagerWithSoledadTestCase, + ADDRESS, + ADDRESS_2, + KEY_FINGERPRINT, + PUBLIC_KEY, + PUBLIC_KEY_2, + PRIVATE_KEY, + PRIVATE_KEY_2, +) + + +class OpenPGPCryptoTestCase(KeyManagerWithSoledadTestCase): + + # set the trial timeout to 20min, needed by the key generation test + timeout = 1200 + + @inlineCallbacks + def _test_openpgp_gen_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, 'user@leap.se') + key = yield pgp.gen_key('user@leap.se') + self.assertIsInstance(key, OpenPGPKey) + self.assertEqual( + 'user@leap.se', key.address, 'Wrong address bound to key.') + self.assertEqual( + 4096, key.length, 'Wrong key length.') + + @inlineCallbacks + def test_openpgp_put_delete_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield pgp.get_key(ADDRESS, private=False) + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_openpgp_put_ascii_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield pgp.get_key(ADDRESS, private=False) + self.assertIsInstance(key, OpenPGPKey) + self.assertTrue( + ADDRESS in key.address, 'Wrong address bound to key.') + self.assertEqual( + 4096, key.length, 'Wrong key length.') + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_get_public_key(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield self._assert_key_not_found(pgp, ADDRESS) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + key = yield pgp.get_key(ADDRESS, private=False) + self.assertTrue(ADDRESS in key.address) + self.assertFalse(key.private) + self.assertEqual(KEY_FINGERPRINT, key.fingerprint) + yield pgp.delete_key(key) + yield self._assert_key_not_found(pgp, ADDRESS) + + @inlineCallbacks + def test_openpgp_encrypt_decrypt(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + # encrypt + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + pubkey = yield pgp.get_key(ADDRESS, private=False) + cyphertext = yield pgp.encrypt(data, pubkey) + + self.assertTrue(cyphertext is not None) + self.assertTrue(cyphertext != '') + self.assertTrue(cyphertext != data) + self.assertTrue(pgp.is_encrypted(cyphertext)) + self.assertTrue(pgp.is_encrypted(cyphertext)) + + # decrypt + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + decrypted, _ = yield pgp.decrypt(cyphertext, privkey) + self.assertEqual(decrypted, data) + + yield pgp.delete_key(pubkey) + yield pgp.delete_key(privkey) + yield self._assert_key_not_found(pgp, ADDRESS, private=False) + yield self._assert_key_not_found(pgp, ADDRESS, private=True) + + @inlineCallbacks + def test_verify_with_private_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey) + self.assertRaises( + AssertionError, + pgp.verify, signed, privkey) + + @inlineCallbacks + def test_sign_with_public_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + self.assertRaises( + AssertionError, + pgp.sign, data, ADDRESS, OpenPGPKey) + + @inlineCallbacks + def test_verify_with_wrong_key_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey) + yield pgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2) + wrongkey = yield pgp.get_key(ADDRESS_2) + self.assertFalse(pgp.verify(signed, wrongkey)) + + @inlineCallbacks + def test_encrypt_sign_with_public_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + self.failureResultOf( + pgp.encrypt(data, privkey, sign=pubkey), + AssertionError) + + @inlineCallbacks + def test_decrypt_verify_with_private_raises(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + encrypted_and_signed = yield pgp.encrypt( + data, pubkey, sign=privkey) + self.failureResultOf( + pgp.decrypt(encrypted_and_signed, privkey, verify=privkey), + AssertionError) + + @inlineCallbacks + def test_decrypt_verify_with_wrong_key(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + encrypted_and_signed = yield pgp.encrypt(data, pubkey, sign=privkey) + yield pgp.put_raw_key(PUBLIC_KEY_2, ADDRESS_2) + wrongkey = yield pgp.get_key(ADDRESS_2) + decrypted, validsign = yield pgp.decrypt(encrypted_and_signed, + privkey, + verify=wrongkey) + self.assertEqual(decrypted, data) + self.assertFalse(validsign) + + @inlineCallbacks + def test_sign_verify(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signed = pgp.sign(data, privkey, detach=False) + pubkey = yield pgp.get_key(ADDRESS, private=False) + validsign = pgp.verify(signed, pubkey) + self.assertTrue(validsign) + + @inlineCallbacks + def test_encrypt_sign_decrypt_verify(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + pubkey = yield pgp.get_key(ADDRESS, private=False) + privkey = yield pgp.get_key(ADDRESS, private=True) + + yield pgp.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + pubkey2 = yield pgp.get_key(ADDRESS_2, private=False) + privkey2 = yield pgp.get_key(ADDRESS_2, private=True) + + data = 'data' + encrypted_and_signed = yield pgp.encrypt( + data, pubkey2, sign=privkey) + res, validsign = yield pgp.decrypt( + encrypted_and_signed, privkey2, verify=pubkey) + self.assertEqual(data, res) + self.assertTrue(validsign) + + @inlineCallbacks + def test_sign_verify_detached_sig(self): + data = 'data' + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PRIVATE_KEY, ADDRESS) + privkey = yield pgp.get_key(ADDRESS, private=True) + signature = yield pgp.sign(data, privkey, detach=True) + pubkey = yield pgp.get_key(ADDRESS, private=False) + validsign = pgp.verify(data, pubkey, detached_sig=signature) + self.assertTrue(validsign) + + @inlineCallbacks + def test_self_repair_three_keys(self): + refreshed_keep = datetime(2007, 1, 1) + self._insert_key_docs([datetime(2005, 1, 1), + refreshed_keep, + datetime(2001, 1, 1)]) + delete_doc = self._mock_delete_doc() + + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + key = yield pgp.get_key(ADDRESS, private=False) + self.assertEqual(key.refreshed_at, refreshed_keep) + self.assertEqual(self.count, 2) + self._soledad.delete_doc = delete_doc + + @inlineCallbacks + def test_self_repair_no_keys(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + + get_from_index = self._soledad.get_from_index + delete_doc = self._soledad.delete_doc + + def my_get_from_index(*args): + if (args[0] == TYPE_FINGERPRINT_PRIVATE_INDEX and + args[2] == KEY_FINGERPRINT): + return succeed([]) + return get_from_index(*args) + + self._soledad.get_from_index = my_get_from_index + self._soledad.delete_doc = Mock(return_value=succeed(None)) + + try: + yield self.assertFailure(pgp.get_key(ADDRESS, private=False), + KeyNotFound) + # it should have deleted the index + self.assertEqual(self._soledad.delete_doc.call_count, 1) + finally: + self._soledad.get_from_index = get_from_index + self._soledad.delete_doc = delete_doc + + @inlineCallbacks + def test_self_repair_put_keys(self): + self._insert_key_docs([datetime(2005, 1, 1), + datetime(2007, 1, 1), + datetime(2001, 1, 1)]) + delete_doc = self._mock_delete_doc() + + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + yield pgp.put_raw_key(PUBLIC_KEY, ADDRESS) + self._soledad.delete_doc = delete_doc + self.assertEqual(self.count, 2) + + @inlineCallbacks + def test_self_repair_six_active_docs(self): + pgp = openpgp.OpenPGPScheme( + self._soledad, gpgbinary=self.gpg_binary_path) + + k1 = OpenPGPKey(ADDRESS, fingerprint="1", + refreshed_at=datetime(2005, 1, 1)) + k2 = OpenPGPKey(ADDRESS, fingerprint="2", + refreshed_at=datetime(2007, 1, 1)) + k3 = OpenPGPKey(ADDRESS, fingerprint="3", + refreshed_at=datetime(2007, 1, 1), + encr_used=True, sign_used=True) + k4 = OpenPGPKey(ADDRESS, fingerprint="4", + refreshed_at=datetime(2007, 1, 1), + sign_used=True) + k5 = OpenPGPKey(ADDRESS, fingerprint="5", + refreshed_at=datetime(2007, 1, 1), + encr_used=True) + k6 = OpenPGPKey(ADDRESS, fingerprint="6", + refreshed_at=datetime(2006, 1, 1), + encr_used=True, sign_used=True) + keys = (k1, k2, k3, k4, k5, k6) + for key in keys: + yield self._soledad.create_doc_from_json(key.get_json()) + yield self._soledad.create_doc_from_json(key.get_active_json()) + + delete_doc = self._mock_delete_doc() + + key = yield pgp.get_key(ADDRESS, private=False) + self._soledad.delete_doc = delete_doc + self.assertEqual(self.count, 5) + self.assertEqual(key.fingerprint, "3") + + def _assert_key_not_found(self, pgp, address, private=False): + d = pgp.get_key(address, private=private) + return self.assertFailure(d, KeyNotFound) + + @inlineCallbacks + def _insert_key_docs(self, refreshed_at): + for date in refreshed_at: + key = OpenPGPKey(ADDRESS, fingerprint=KEY_FINGERPRINT, + refreshed_at=date) + yield self._soledad.create_doc_from_json(key.get_json()) + yield self._soledad.create_doc_from_json(key.get_active_json()) + + def _mock_delete_doc(self): + delete_doc = self._soledad.delete_doc + self.count = 0 + + def my_delete_doc(*args): + self.count += 1 + return delete_doc(*args) + self._soledad.delete_doc = my_delete_doc + return delete_doc diff --git a/tests/keymanager/test_validation.py b/tests/keymanager/test_validation.py new file mode 100644 index 0000000..4aa0795 --- /dev/null +++ b/tests/keymanager/test_validation.py @@ -0,0 +1,502 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the Validation Levels +""" + +import unittest +from datetime import datetime +from twisted.internet.defer import inlineCallbacks + +from leap.keymanager.errors import KeyNotValidUpgrade +from leap.keymanager.validation import ValidationLevels + +from common import ( + KeyManagerWithSoledadTestCase, + ADDRESS, + PUBLIC_KEY, + ADDRESS_2, + PUBLIC_KEY_2, + PRIVATE_KEY_2, + KEY_FINGERPRINT +) + + +class ValidationLevelsTestCase(KeyManagerWithSoledadTestCase): + + @inlineCallbacks + def test_none_old_key(self): + km = self._key_manager() + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, KEY_FINGERPRINT) + + @inlineCallbacks + def test_cant_upgrade(self): + km = self._key_manager() + yield km.put_raw_key(PUBLIC_KEY, ADDRESS, + validation=ValidationLevels.Provider_Trust) + d = km.put_raw_key(UNRELATED_KEY, ADDRESS) + yield self.assertFailure(d, KeyNotValidUpgrade) + + @inlineCallbacks + def test_fingerprint_level(self): + km = self._key_manager() + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, ADDRESS, + validation=ValidationLevels.Fingerprint) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + + @inlineCallbacks + def test_expired_key(self): + km = self._key_manager() + yield km.put_raw_key(EXPIRED_KEY, ADDRESS) + yield km.put_raw_key(UNRELATED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + + @inlineCallbacks + def test_expired_fail_lower_level(self): + km = self._key_manager() + yield km.put_raw_key( + EXPIRED_KEY, ADDRESS, + validation=ValidationLevels.Third_Party_Endorsement) + d = km.put_raw_key( + UNRELATED_KEY, ADDRESS, + validation=ValidationLevels.Provider_Trust) + yield self.assertFailure(d, KeyNotValidUpgrade) + + @inlineCallbacks + def test_roll_back(self): + km = self._key_manager() + yield km.put_raw_key(EXPIRED_KEY_UPDATED, ADDRESS) + yield km.put_raw_key(EXPIRED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.expiry_date, EXPIRED_KEY_NEW_EXPIRY_DATE) + + @inlineCallbacks + def test_not_used(self): + km = self._key_manager() + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS, + validation=ValidationLevels.Provider_Trust) + yield km.put_raw_key(UNRELATED_KEY, ADDRESS, + validation=ValidationLevels.Provider_Endorsement) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, UNRELATED_FINGERPRINT) + + @inlineCallbacks + def test_used_with_verify(self): + TEXT = "some text" + + km = self._key_manager() + yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) + signature = yield km.sign(TEXT, ADDRESS) + yield self.delete_all_keys(km) + + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) + yield km.encrypt(TEXT, ADDRESS) + yield km.verify(TEXT, ADDRESS, detached_sig=signature) + + d = km.put_raw_key( + UNRELATED_KEY, ADDRESS, + validation=ValidationLevels.Provider_Endorsement) + yield self.assertFailure(d, KeyNotValidUpgrade) + + @inlineCallbacks + def test_used_with_decrypt(self): + TEXT = "some text" + + km = self._key_manager() + yield km.put_raw_key(UNEXPIRED_PRIVATE, ADDRESS) + yield km.put_raw_key(PUBLIC_KEY_2, ADDRESS_2) + encrypted = yield km.encrypt(TEXT, ADDRESS_2, sign=ADDRESS) + yield self.delete_all_keys(km) + + yield km.put_raw_key(UNEXPIRED_KEY, ADDRESS) + yield km.put_raw_key(PRIVATE_KEY_2, ADDRESS_2) + yield km.encrypt(TEXT, ADDRESS) + yield km.decrypt(encrypted, ADDRESS_2, verify=ADDRESS) + + d = km.put_raw_key( + UNRELATED_KEY, ADDRESS, + validation=ValidationLevels.Provider_Endorsement) + yield self.assertFailure(d, KeyNotValidUpgrade) + + @inlineCallbacks + def test_signed_key(self): + km = self._key_manager() + yield km.put_raw_key(PUBLIC_KEY, ADDRESS) + yield km.put_raw_key(SIGNED_KEY, ADDRESS) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, SIGNED_FINGERPRINT) + + @inlineCallbacks + def test_two_uuids(self): + TEXT = "some text" + + km = self._key_manager() + yield km.put_raw_key(UUIDS_PRIVATE, ADDRESS_2) + signature = yield km.sign(TEXT, ADDRESS_2) + yield self.delete_all_keys(km) + + yield km.put_raw_key(UUIDS_KEY, ADDRESS_2) + yield km.put_raw_key(UUIDS_KEY, ADDRESS) + yield km.encrypt(TEXT, ADDRESS_2) + yield km.verify(TEXT, ADDRESS_2, detached_sig=signature) + + d = km.put_raw_key( + PUBLIC_KEY_2, ADDRESS_2, + validation=ValidationLevels.Provider_Endorsement) + yield self.assertFailure(d, KeyNotValidUpgrade) + key = yield km.get_key(ADDRESS_2, fetch_remote=False) + self.assertEqual(key.fingerprint, UUIDS_FINGERPRINT) + + yield km.put_raw_key( + PUBLIC_KEY, ADDRESS, + validation=ValidationLevels.Provider_Endorsement) + key = yield km.get_key(ADDRESS, fetch_remote=False) + self.assertEqual(key.fingerprint, KEY_FINGERPRINT) + + +# Key material for testing + +# key 901FBCA5: public key "Leap Test Key <leap@leap.se>" +UNRELATED_FINGERPRINT = "ABCCD9C8270B6A8D5633FAC9D04DB2E4901FBCA5" +UNRELATED_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mQENBFQ9VDoBCACbKflcEhUXZULOT4Fwc2ifRUllJpusd2uX5oeDlZdZ15uLY2eF +LcxnAdIWkI/PsXimh0ev/Pf4oCynfmt02I3c2d9F0N6JXWnRiP+p098oPOcqeEqL +N3CrkH1RVnEXNeJ/Fu7tkD61SBXl1MytMfcHyhN5arg8OcVAjcmghX53+92jFhC9 +8ss87H/qEe5vEX/ahP3tiL5ULvaS4GIX+XB0O3yCVdRoRG9lqMIBP/ZqCkKrNll8 +dT12a6ByG/rWharZUeUETiM4Y+JjDUUaEC2YhNF9k52JNGanLH9LTTtlKy5WTT+E +C6T6VMAtkwcBDpkXr5sBB/N+Y1z0Fp359lIXABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JATgEEwECACIFAlQ9VDoCGwMGCwkIBwMCBhUIAgkKCwQW +AgMBAh4BAheAAAoJENBNsuSQH7ylsSUIAIxUFbkeTdHbCF/LVA2U+ktnR1iVikAY +vFK+U+Bto11/AO4Kew2eWniDch/sqLQOoSydtP42z2z3/Al3u7LhQ8bElQHPDY78 +t49qweyJi00V3vCKCdWwPJnPM5eJOIrZHCbwIgeXCsXxVNJVyziVqMuum+px1h2d +1YJZXYejT8rzwa3yBPAsGWRAWETeTvUuyjPMFa59scbnaDuY+bwQ2r/qG9m7UyHU +h2kAHC5sf1rixVOY6rLhw75gQHE/L2BZJRfVsDQqIpEMh2OgMfNbL928jncjwQvc +/IXXwSUx7y50ll+uNh+TVLf0MlUjKdHmHqnGBMlIIWojWJuKxYmOOoO5AQ0EVD1U +OgEIAM/TlhWVSI+tl5XBUAcf60RxjpHQkmdfq1i1jgwUgu/638EKzBfLcnRYX8Rn +DO9CWnHcql/4hp226fIWZN/SyReE81n7UkLDMAglhHgiezHMSH1GYVu4IlfpLVXn +brLVo83KioH5MPFWmZv5tigpU/G8dTx9yVGv1//YW2qqRYYqeIKJfapBaY/bNqyD +vYRfZo1K2brtHx4bToY6mALRF4ruV5SVZGS69e4Sh692C2pXSVbCpRhQ/2WnvkZH +leFIdmNmQN61MC1k26A620Rm+pAsXX71dln0u96xbrCgEVbi6ccfXzbFKtVmThVB +w11CLvVTviOm99TmcgpmDS4cf08AEQEAAYkBHwQYAQIACQUCVD1UOgIbDAAKCRDQ +TbLkkB+8pR+fB/0SeTcRr1duN7VYWdtng1+jO0ornIBtUraglN01dEEmiwN83DTi +J37i+nll+4is7BtiXqhumRptKh1v8UUMyFX/rjjoojCJBg5NExsiOYl3O4le68oF +3+XC+n7yrlyNmI15+3dcQmC9F6HN8EBZgrn5YPKGIOMHTGatB5PryMKg2IKiN5GZ +E0hmrOQgmcGrkeqysKACQYUHTasSk2IY1l1G5YQglqCaBh4+UC82Dmg5fTBbHjxP +YhhojkP4aD/0YW7dgql3nzYqvPCAjBH1Cf6rA9HvAJwUP9Ig/okcrrPEKm638+mG ++vNIuLqIkA4oFLBAAIrgMiQZ+NZz9uD6DJE7 +=FO7G +-----END PGP PUBLIC KEY BLOCK----- +""" + +# key A1885A7C: public key "Leap Test Key <leap@leap.se>" +EXPIRED_FINGERPRINT = "7C1F68B0E14157B09B5F4ADE6F15F004A1885A7C" +EXPIRED_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +mQENBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj +b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ +82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3 +acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A +IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV +wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JAT4EEwECACgFAhvrfd0CGwMFCQABUYAGCwkIBwMCBhUI +AgkKCwQWAgMBAh4BAheAAAoJEG8V8AShiFp8VNkH/iCQcXkTfMOVlL2rQRyZtJEO +Lr5uTyyY8O6ubeNCHqZzlIopiPAsv4hIYjjMDvOfZ9R53YgmbacUm0rvh1B4MSUf +k+sa9/tequ3y44LUKp7AB6NyyLgVOU5ngl2w+bi7CgXAep3oP4joYKcU0mmSAc2S +2Gj85DVqP0kdzNs47esvyj7g1TOfdBwmLsTx/219H+w3dNBeyCQWkYCYNh7MX/Ba +SZ+P0xr4FetcOVPM3wAzUtDG7hKsgccoIXt0FWhG/nn8cETfGH+o3W/ky7Jktatx +DGDHoZJvAaG2B2ey1pAQlezr8p/O+ZVABiigHk1S+myBHyhlXzUcjhQnEG7aHZ65 +AQ0EG+t93QEIAKqRq/2sBDW4g3FU+11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1 +XeA+kTHiF0LaqoaciDRvkA9DvhDbSrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5 +sXbuipY3TEiakugdSU4rzgi0hFycm6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm +4thYPuJ1kPH8/bkbTi9sLHoApYgL+7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3 +leldixHHKAutNt49p0pkXlORAHRpUmp+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQ +KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwUCG+t93QIb +DAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyykuqEeNb1LV +D9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5YrwxGdmoyBLm +unaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0sH6Kj5/j +Mgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6DzmRt1rJQcq +K/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyYHEsoyd7W +Osuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci +=WhX+ +-----END PGP PUBLIC KEY BLOCK----- +""" +# updated expiration date +EXPIRED_KEY_NEW_EXPIRY_DATE = datetime.fromtimestamp(2049717872) +EXPIRED_KEY_UPDATED = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +mQENBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj +b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ +82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3 +acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A +IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV +wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JAT4EEwECACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4B +AheABQJUlDCSBQleQLiTAAoJEG8V8AShiFp8t3QH/1eqkVIScXmqaCVeno3VSKiH +HqnxiHcEgtpNRfUlP6tLD4H6QPEpvoUI9S/8HSYi3nbDGXEX8ycKlnwxjdIqWSOW +xj91/7uQAo+dP9QaVJ6xgaAiqzN1x3JzX3Js1wTodmNV0TfmGjxwnC5up/xK7/pd +KuDP3woDsRlwy8Lgj67mkn49xfAFHo6hI6SD36UBDAC/ELq6kZaba4Kk0fEVHCEz +HX0B09ZIY9fmf305cEB3dNh6SMQgKtH0wKozaqI2UM2B+cs3z08bC+YuUUh7UJTH +yr+hI7vF4/WEeJB3fuhP3xsumLhV8P47DaJ7oivmtsDEbAJFKqvigEqNES73Xpy5 +AQ0EG+t93QEIAKqRq/2sBDW4g3FU+11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1 +XeA+kTHiF0LaqoaciDRvkA9DvhDbSrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5 +sXbuipY3TEiakugdSU4rzgi0hFycm6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm +4thYPuJ1kPH8/bkbTi9sLHoApYgL+7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3 +leldixHHKAutNt49p0pkXlORAHRpUmp+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQ +KLyKoh5wsJsaPXBjdG7cf6G/cBcwvnQVUHcAEQEAAYkBJQQYAQIADwIbDAUCVJQw +3QUJXkC4/QAKCRBvFfAEoYhafEtiB/9hMfSFNMxtlIJDJArG4JwR7sBOatYUT858 +qZnTgGETZN8wXpeEpXWKdDdmCX9aeE9jsDNgSQ5WWpqU21bGMXh1IGjAzmqTqq3/ +ik1vALuaVfr6OqjTzrJVQujT61CGed26xpP3Zh8hLKyKa+dXnX/VpgZS42wZLPx2 +wcODfANmTfE2AhMap/RyDy21q4nau+z2hMEOKdtF8dpP+pEvzoN5ZexYP1hfT+Av +oFPyVB5YtEMfxTEshDKRPjbdgNmw4faKXd5Cbelo4YxxpO16FHb6gzIdjOX15vQ+ +KwcVXzg9xk4D3cr1mnTCops/iv6TXvcw4Wbo70rrKXwkjl8LKjOP +=sHoe +-----END PGP PUBLIC KEY BLOCK----- +""" +UNEXPIRED_KEY = EXPIRED_KEY_UPDATED +UNEXPIRED_PRIVATE = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +lQOYBBvrfd0BCADGNpspaNhsbhSjKioCWrE2MTTYC+Sdpes22RabdhQyOCWvlSbj +b8p0y3kmnMOtVBT+c22/w7eu2YBfIpS4RswgE5ypr/1kZLFQueVe/cp29GjPvLwJ +82A3EOHcmXs8rSJ76h2bnkySvbJawz9rwCcaXhpdAwC+sjWvbqiwZYEL+90I4Xp3 +acDh9vNtPxDCg5RdI0bfdIEBGgHTfsda3kWGvo1wH5SgrTRq0+EcTI7aJgkMmM/A +IhnpACE52NvGdG9eB3x7xyQFsQqK8F0XvEev2UJH4SR7vb+Z7FNTJKCy6likYbSV +wGGFuowFSESnzXuUI6PcjyuO6FUbMgeM5euFABEBAAEAB/0cwelrGEdmG+Z/RxZx +4anvpzNNMRSJ0Xu508SVk4vElCQrlaPfFZC1t0ZW1XcHsQ5Gsy/gxaA4YbK1RXV2 +8uvvWh5oTsdLByzj/cSLLp5u+cYxyuaBOb/jiAiCPVEFnEec23pQ4fumwpebgX5f +FLGCVYAqWc2EMqOFVgnAEJ9TbIWRnCkN04r1WSc7eLcUlH+vTp4HUPd6PQj56zSr +J5beeviHgYB76M6mcM/BRzLmcl4M7bgx5olp8A0Wz7ub+hXICmNQyqpE8qZeyGjq +v4T/6BSpsp5yEGDMkahFyO7OwB7UI6SZGkdnWKGeXOWG48so6cFdZ8dxRGx49gFL +1rP1BADfYjQDfmBpB6tC1MyATb1MUK/1CN7wC5w7fXCtPbYNiqc9s27W9NXQReHD +GOU04weU+ZJsV6Fwlt3oRD2j05vNdhbqKseLdsm27/kg2GWZvjamriHqZ94sw6yk +fg3MqPb4JdFzBZVHqD50AHASx2rMshBeMVo27LhcADCWM9P8bwQA4yeRonbIAUls +yAwWIRCMel2JY1u/zmJrg8FFAG2LYx+pYaxkRxjSJNlQQV7o6aYiU3Yw+nXvj5Pz +IdOdimWfFb8eZ3U6tbognJxjwU8vV3ili40O7SENgloeM/nzg+nQjIaS9utfE8Et +juV7f9OWi8Fo+xzSOvUGwoL/zW5t+UsD/0bm+5ch53Sm1ITCn7yfMrp0YaT+YC3Y +mNNfrfbFpEd20ky4K9COIFDFCJmMyKLx/jSajcf4JqrxB/mOmHHAF9CeL7LUy/XV +O8Ec5lkovicDIDT1b+pQYEYvh5UBJmoq1R5nbNLo70gFtGP6b4+t27Gxks5VLhF/ +BVvxK7xjmkBETnq0HExlYXAgVGVzdCBLZXkgPGxlYXBAbGVhcC5zZT6JAT4EEwEC +ACgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheABQJUURIXBQld/ZovAAoJEG8V +8AShiFp8xUcIALcAHZbaxvyhHRGOrwDddbH0fFDK0AqKTsIT7y4D/HLFCP5zG3Ck +7qGPZdkHXZfzq8rIb+zUjW3oJIVI1IucHxG2T5kppa8RFCBAFlRWYf6R3isX3YL0 +d3QSragjoxRNPcHNU8ALHcvfSonFHBoi4fH44rvgksAiT68SsdPaoXDlabx5T15e +vu/7T5e/DGMQVPMxiaSuSQhbOKuMk2wcFdmLtBYHLZPa54hHPNhEDyxLgtKKph0g +Obk9ojKfH9kPvLveIcpS5CqTJfN/kqBz7CJWwEeAi2iG3H1OEB25aCUdTxXSRNlG +qEgcWPaWxtc1RzlARu7LB64OUZuRy4puiAGdA5gEG+t93QEIAKqRq/2sBDW4g3FU ++11LhixT+GosrfVvnitz3S9k2tBXok/wYpI1XeA+kTHiF0LaqoaciDRvkA9DvhDb +SrNM1yeuYRyZiHlTmoPZ/Fkl60oA2cyLd1L5sXbuipY3TEiakugdSU4rzgi0hFyc +m6Go6yq2G6eC6UALvD9CTMdZHw40TadG9xpm4thYPuJ1kPH8/bkbTi9sLHoApYgL ++7ssje8w4epr0qD4IGxeKwJPf/tbTRpnd8w3leldixHHKAutNt49p0pkXlORAHRp +Ump+KMZhFvCvIPwe9o5mYtMR7sDRxjY61ZEQKLyKoh5wsJsaPXBjdG7cf6G/cBcw +vnQVUHcAEQEAAQAH/A0TCHNz3Yi+oXis8m2WzeyU7Sw6S4VOLnoXMgOhf/JLXVoy +S2P4qj73nMqNkYni2AJkej5GtOyunSGOpZ2zzKQyhigajq76HRRxP5oXwX7VLNy0 +bguSrys2IrJb/8Fq88rN/+H5kpvxNlog+P79wzTta5Y9/yIVJDNXIip/ptVARhA7 +CrdDyE4EaPjcWCS3/9a4R8JDZl19PlTE23DD5ffZv5wNEX38oZkDCK4Si+kqhvm7 +g0Upr49hnvqRPXoi46OBAoUh9yVTKaNDMsRWblvno7k3+MF0CCnix5p5JR74bGnZ +8kS14qXXkAa58uMaAIcv86+mHNovXhaxcog+8k0EAM8wWyWPjdO2xbwwB0hS2E9i +IO/X26uhLY3/tozbOekvqXKvwIy/xdWNVHr7eukAS+oIY10iczgKkMgquoqvzR4q +UY5WI0iC0iMLUGV7xdxusPl+aCbGKomtN/H3JR2Wecgje7K/3o5BtUDM6Fr2KPFb ++uf/gqVkoMmp3O/DjhDlBADSwMHuhfStF+eDXwSQ4WJ3uXP8n4M4t9J2zXO366BB +CAJg8enzwQi62YB+AOhP9NiY5ZrEySk0xGsnVgex2e7V5ilm1wd1z2el3g9ecfVj +yu9mwqHKT811xsLjqQC84JN+qHM/7t7TSgczY2vD8ho2O8bBZzuoiX+QIPYUXkDy +KwP8DTeHjnI6vAP2uVRnaY+bO53llyO5DDp4pnpr45yL47geciElq3m3jXFjHwos +mmkOlYAL07JXeZK+LwbhxmbrwLxXNJB//P7l8ByRsmIrWvPuPzzcKig1KnFqvFO1 +5wGU0Pso2qWZU+idrhCdG+D8LRSQ0uibOFCcjFdM0JOJ7e1RdIkBJQQYAQIADwUC +G+t93QIbDAUJAAFRgAAKCRBvFfAEoYhafOPgB/9z4YCyT/N0262HtegHykhsyyku +qEeNb1LVD9INcP+RbCX/0IjFgP4DTMPP7qqF1OBwR276maALT321Gqxc5HN5Yrwx +GdmoyBLmunaQJJlD+7B1C+jnO6r4m44obvJ/NMERxVyzkXap3J2VgRIO1wNLI9I0 +sH6Kj5/jMgy06OwXDcqIc+jB4sIJ3Tnm8LZ3phJzNEm9mI8Ak0oJ7IEcMndR6Dzm +Rt1rJQcqK/D7hOG02zvyRhxF27U1qR1MxeU/gNnOx8q4dnVyWB+EiV1sFl4iTOyY +HEsoyd7WOsuse7+NkyUHgMXMVW7cz+nU7iO+ht2rkBtv+Z5LGlzgHTeFjKci +=dZE8 +-----END PGP PRIVATE KEY BLOCK----- +""" + +# key CA1AD31E: public key "Leap Test Key <leap@leap.se>" +# signed by E36E738D69173C13D709E44F2F455E2824D18DDF +SIGNED_FINGERPRINT = "6704CF3362087DA23E3D2DF8ED81DFD1CA1AD31E" +SIGNED_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1.4.12 (GNU/Linux) + +mQENBFQ9DHMBCADJXyNVzTQ+NnmSDbR6q8jjDsnqk/IgKrMBkpjNxUa/0HQ4o0Yh +pklzR1hIc/jsdgq42A0++pqdfQFeRc2NVw/NnE/9uzW73YuaWg5XnWGjuAP3UeRI +3xjL/cscEFmGfGkuGvFpIVa7GBPqz1SMBXWULJbkCE1pnHfgqh0R7oc5u0omnsln +0zIrmLX1ufpDRSUedjSgIfd6VqbkPm3NJuZE4NVn6spHG3zTxqcaPCG0xLfHw7eS +qgUdz0BFaxqtQiXffBpA3KvGJW0792VjDh4M6kDvpeYpKRmB9oEYlT3n3KvQrdPE +B3N5KrzJj1QIL990q4NQdqjg+jUE5zCJsTdzABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JATgEEwECACIFAlQ9DHMCGwMGCwkIBwMCBhUIAgkKCwQW +AgMBAh4BAheAAAoJEO2B39HKGtMeI/4H/0/OG1OqtQEoYscvJ+BZ3ZrM2pEk7KDd +7AEEf6QIGSd38GFyn/pve24cpRLv7phKNy9dX9VJhTDobpKvK0ZT/yQO3FVlySAN +NVpu93/jrLnrW51J3p/GP952NtUAEP5l1uyYIKZ1W3RLWws72Lh34HTaHAWC94oF +vnS42IYdTn4y6lfizL+wYD6CnfrIpHm8v3NABEQZ8e/jllrRK0pnOxAdFv/TpWEl +8AnTZXcejSBgCG6UmDtrRKfgoQlGJEIH61QSqHpRIwkepQVYexUwgcLFAZPI9Hvw +N5pZQ5Z+XcsYTGtLNEpF7YW6ykLDRTAv6LiBQPkBX8TDKhkh95Cs3sKJAhwEEAEC +AAYFAlQ9DgIACgkQL0VeKCTRjd/pABAAsNPbiGwuZ469NxwTgf3+EMoWZNHf5ZRa +ZbzKKesLFEElMdX3Q/MkVc1T8Rsy9Fdn1tf/H6glwuKyqWeXNkxa86VT6gck9WV6 +bslFIl/vJpb3dcmiCCM1tSCYpX0yE0fqebMihcfvNqDw3GdZBUo7R0pWN6BEh4iM +YYWTAtuPCrbsv+2bSid1ZLIO6FIF5kskg60h/IbSr+A+DSBgyyjf9fbUv6MoyMw8 +08GtCAx6VGJhTTC/RkWIA+N3n83W5XQFszOOg/PAAg0JMUXUBGvjfYJ5fcB8cfuw +1XZe9uWsDmYpwfVEtDajrLbatkXAu22pjIJnB4cVqiD+4hHbBCFkeZIfdRsPEINO +UacsjVZV5/EPDN9OpkvZbkrLJ6eaQnmQZnFclquNHUCqFI0QYUml0BXXaZq+aEJ9 +N9x00kdYV1xW6zkL+MGgxdViC5n6dwJcU3MANrykV8Cc5/x+wmwY8AXbHzU7MxvY +nGlAYsAZHhf4ZlEdAO6C329VotMxBLFd5DJZZoN+ysaOpsUNRl0JO41+6bbI141l +DCmzWUG4iTI70zxsgzZGgEt0HlMDoIxElPcy/jDKi1IfEDmveK+QR9WphM40Ayvx +VTeA6g9WagmoHopQs/D/Kbi3Q8izFDfXTwA52DUxTjyUEFn0jEOiG9BFmnIkQ6LE +3WkIJFd3D0+5AQ0EVD0McwEIALRXukBsOrcA/rNJ4SV4I64cGdN8q9Gl5RpLl8cS +L5+SGHp6KoCL4daBqpbxdhE/Ylb3QmPt2SBZbTkwJ2yuczELOyhH6R13OWRWCgkd +dYLZsm/sEzu7dVoFQ4peKTGDzI8mQ/s157wRkz/8iSUYjJjeM3g0NI55FVcefibN +pOOFRaYGUh8itofRFOu7ipZ9F8zRJdBwqISe1gemNBR+O3G3Srm34PYu6sZRsdLU +Nf+81+/ynQWQseVpbz8X93sx/onIYIY0w+kxIE0oR/gBBjdsMOp7EfcvtbGTgplQ ++Zxln/pV2bVFkGkjKniFEEfi4eCPknCj0+67qKRt/Fc9n98AEQEAAYkBHwQYAQIA +CQUCVD0McwIbDAAKCRDtgd/RyhrTHmmcCACpTbjOYjvvr8/rD60bDyhnfcKvpavO +1/gZtMeEmw5gysp10mOXwhc2XuC3R1A6wVbVgGuOp5RxlxI4w8xwwxMFSp6u2rS5 +jm5slXBKB2i3KE6Sao9uZKP2K4nS8Qc+DhathfgafI39vPtBmsb1SJd5W1njNnYY +hARRpViUcVdfvW3VRpDACZ79PBs4ZQQ022NsNAPwm/AJvAw2S42Y9tjTnaLVRLfH +lzdErcGTBnfEIi0mQF/11k/THBJxx7vaFt8YXiDlWLUkg5XW3xK9mkETbaTv+/tB +X2+l7IOSt+31KQCBFN/VmhTySJOVQC1d2A56lSH2c/DWVClji+x3suzn +=xprj +-----END PGP PUBLIC KEY BLOCK----- +""" + +# key 0x1DDBAEB928D982F7: public key two uuids +# uid anotheruser <anotheruser@leap.se> +# uid Leap Test Key <leap@leap.se> +UUIDS_FINGERPRINT = "21690ED054C1B2F3ACE963D38FCC7DEFB4EE5A9B" +UUIDS_KEY = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mQENBFZwjz0BCADHpVg7js8PK9gQXx3Z+Jtj6gswYZpeXRdIUfZBSebWNGKwXxC9 +ZZDjnQc3l6Kezh7ra/hB6xulDbj3vXi66V279QSOuFAKMIudlJehb86bUiVk9Ppy +kdrn44P40ZdVmw2m61WrKnvBelKW7pIF/EB/dY1uUonSfR56f/BxL5a5jGi2uI2y +2hTPnZEksoKQsjsp1FckPFwVGzRKVzYl3lsorL5oiHd450G2B3VRw8AZ8Eqq6bzf +jNrrA3TOMmEIYdADPqqnBj+xbnOCGBsvx+iAhGRxUckVJqW92SXlNNds8kEyoE0t +9B6eqe0MrrlcK3SLe03j85T9f1o3An53rV/3ABEBAAG0HExlYXAgVGVzdCBLZXkg +PGxlYXBAbGVhcC5zZT6JAT0EEwEIACcFAlZwjz0CGwMFCRLMAwAFCwkIBwMFFQoJ +CAsFFgMCAQACHgECF4AACgkQj8x977TuWpu4ZggAgk6rVtOCqYwM720Bs3k+wsWu +IVPUqnlPbSWph/PKBKWYE/5HoIGdvfN9jJxwpCM5x/ivPe1zeJ0qa9SnO66kolHU +7qC8HRWC67R4znO4Zrs2I+SgwRHAPPbqMVPsNs5rS0D6DCcr+LXtJF+LLAsIwDfw +mXEsKbX5H5aBmmDnfq0pGx05E3tKs5l09VVESvVZYOCM9b4FtdLVvgbKAD+KYDW1 +5A/PzOvyYjZu2FGtPKmNmqHD3KW8cmrcI/7mRR08FnNGbbpgXPZ2GPKgkUllY9N7 +E9lg4eaYH2fIWun293kbqp8ueELZvoU1jUQrP5B+eqBWTvIucqdQqJaiWn9pELQh +YW5vdGhlcnVzZXIgPGFub3RoZXJ1c2VyQGxlYXAuc2U+iQE9BBMBCAAnBQJWcI9a +AhsDBQkSzAMABQsJCAcDBRUKCQgLBRYDAgEAAh4BAheAAAoJEI/Mfe+07lqblRMH +/17vK2WOd0F7EegA5ELOrM+QJPKpLK4e6WdROUpS1hvRQcAOqadCCpYPSTTG/HnO +d9/Q9Q/G3xHHlk6nl1qHRkVlp0iVWyBZFn1s8lgGz/FFUEXXRj7I5RGmKSNgDlqA +un2OrwB2m+DH6pMjizu/RUfIJM2bSgViuvjCQoaLYLcFiULJlWHb/2WgpvesFyAc +0oc9AkXuaCEo50XQlrt8Bh++6rfbAMAS7ZrHluFPIY+y4eVl+MU/QkoGYAVgiLLV +5tnvbDZWNs8ixw4ubqKXYj5mK55sapokhOqObEfY6D3p7YpdQO/IhBneCw9gKOxa +wYAPhCOrJC8JmE69I1Nk8Bu5AQ0EVnCPPQEIANUivsrR2jwb8C9wPONn0HS3YYCI +/IVlLdw/Ia23ogBF1Uh8ORNg1G/t0/6S7IKDZ2gGgWw25u9TjWRRWsxO9tjOPi2d +YuhwlQRnq5Or+LzIEKRs9GnJMLFT0kR9Nrhw4UyaN6tWkR9p1Py7ux8RLmDEMAs3 +fBfVMLMmQRerJ5SyCUiq/lm9aYTLCC+vkjE01C+2oI0gcWGfLDjiJbaD4AazzibP +fBe41BIk7WaCJyEcBqfrgW/Seof43FhSKRGgc5nx3HH1DMz9AtYfKnVS5DgoBGpO +hxgzIJN3/hFHPicRlYoHxLUE48GcFfQFEJ78RXeBuieXAkDxNczHnLkEPx8AEQEA +AYkBJQQYAQgADwUCVnCPPQIbDAUJEswDAAAKCRCPzH3vtO5amyRIB/9IsWUWrvbH +njw2ZCiIV++lHgIntAcuQIbZIjyMXoM+imHsPrsDOUT65yi9Xp1pUyZEKtGkZvP4 +p7HRzJL+RWiWEN7sgQfNqqev8BF2/xmxkmiSuXHJ0PSaC5DmLfFDyvvSU6H5VPud +NszKIHtyoS6ph6TH8GXzFmNiHaTOZKdmVxlyLj1/DN+d63M+ARZIe7gB6jmP/gg4 +o52icfTcqLkkppYn8g1A9bdJ3k8qqExNPTf2llDscuD9VzpebFbPqfcYqR71GfG7 +Kmg7qGnZtNac1ERvknI/fmmCQydGk5pOh0KUTgeLG2qB07cqCUBbOXaweNWbiM9E +vtQLNMD9Gn7D +=MCXv +-----END PGP PUBLIC KEY BLOCK----- +""" +UUIDS_PRIVATE = """ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Version: GnuPG v1 + +lQOYBFZwjz0BCADHpVg7js8PK9gQXx3Z+Jtj6gswYZpeXRdIUfZBSebWNGKwXxC9 +ZZDjnQc3l6Kezh7ra/hB6xulDbj3vXi66V279QSOuFAKMIudlJehb86bUiVk9Ppy +kdrn44P40ZdVmw2m61WrKnvBelKW7pIF/EB/dY1uUonSfR56f/BxL5a5jGi2uI2y +2hTPnZEksoKQsjsp1FckPFwVGzRKVzYl3lsorL5oiHd450G2B3VRw8AZ8Eqq6bzf +jNrrA3TOMmEIYdADPqqnBj+xbnOCGBsvx+iAhGRxUckVJqW92SXlNNds8kEyoE0t +9B6eqe0MrrlcK3SLe03j85T9f1o3An53rV/3ABEBAAEAB/9Lzeg2lP7hz8/2R2da +QB8gTNl6wVSPx+DzQMuz9o+DfdiLB02f3FSrWBBJd3XzvmfXE+Prg423mgJFbtfM +gJdqqpnUZv9dHxmj96urTHyyVPqF3s7JecAYlDaj31EK3BjO7ERW/YaH7B432NXx +F9qVitjsrsJN/dv4v2NYVq1wPcdDB05ge9WP+KRec7xvdTudH4Kov0iMZ+1Nksfn +lrowGuMYBGWDlTNoBoEYxDD2lqVaiOjyjx5Ss8btS59IlXxApOFZTezekl7hUI2B +1fDQ1GELL6BKVKxApGSD5XAgVlqkek4RhoHmg4gKSymfbFV5oRp1v1kC0JIGvnB1 +W5/BBADKzagL4JRnhWGubLec917B3se/3y1aHrEmO3T0IzxnUMD5yvg81uJWi5Co +M05Nu/Ny/Fw1VgoF8MjiGnumB2KKytylu8LKLarDxPpLxabOBCQLHrLQOMsmesjR +Cg3iYl/EeM/ooAufaN4IObcu6Pa8//rwNE7Fz1ZsIyJefN4fnwQA/AOpqA2BvHoH +VRYh4NVuMLhF1YdKqcd/T/dtSqszcadkmG4+vAL76r3jYxScRoNGQaIkpBnzP0ry +Adb0NDuBgSe/Cn44kqT7JJxTMfJNrw2rBMMUYZHdQrck2pf5R4fZ74yJyvCKg5pQ +QAl1gTSi6PJvPwpc7m7Kr4kHBVDlgKkEAJKkVrtq/v2+jSA+/osa4YC5brsDQ08X +pvZf0MBkc5/GDfusHyE8HGFnVY5ycrS85TBCrhc7suFu59pF4wEeXsqxqNf02gRe +B+btPwR7yki73iyXs4cbuszHMD03UnbvItFAybD5CC+oR9kG5noI0TzJNUNX9Vkq +xATf819dhwtgTha0HExlYXAgVGVzdCBLZXkgPGxlYXBAbGVhcC5zZT6JAT0EEwEI +ACcFAlZwjz0CGwMFCRLMAwAFCwkIBwMFFQoJCAsFFgMCAQACHgECF4AACgkQj8x9 +77TuWpu4ZggAgk6rVtOCqYwM720Bs3k+wsWuIVPUqnlPbSWph/PKBKWYE/5HoIGd +vfN9jJxwpCM5x/ivPe1zeJ0qa9SnO66kolHU7qC8HRWC67R4znO4Zrs2I+SgwRHA +PPbqMVPsNs5rS0D6DCcr+LXtJF+LLAsIwDfwmXEsKbX5H5aBmmDnfq0pGx05E3tK +s5l09VVESvVZYOCM9b4FtdLVvgbKAD+KYDW15A/PzOvyYjZu2FGtPKmNmqHD3KW8 +cmrcI/7mRR08FnNGbbpgXPZ2GPKgkUllY9N7E9lg4eaYH2fIWun293kbqp8ueELZ +voU1jUQrP5B+eqBWTvIucqdQqJaiWn9pELQhYW5vdGhlcnVzZXIgPGFub3RoZXJ1 +c2VyQGxlYXAuc2U+iQE9BBMBCAAnBQJWcI9aAhsDBQkSzAMABQsJCAcDBRUKCQgL +BRYDAgEAAh4BAheAAAoJEI/Mfe+07lqblRMH/17vK2WOd0F7EegA5ELOrM+QJPKp +LK4e6WdROUpS1hvRQcAOqadCCpYPSTTG/HnOd9/Q9Q/G3xHHlk6nl1qHRkVlp0iV +WyBZFn1s8lgGz/FFUEXXRj7I5RGmKSNgDlqAun2OrwB2m+DH6pMjizu/RUfIJM2b +SgViuvjCQoaLYLcFiULJlWHb/2WgpvesFyAc0oc9AkXuaCEo50XQlrt8Bh++6rfb +AMAS7ZrHluFPIY+y4eVl+MU/QkoGYAVgiLLV5tnvbDZWNs8ixw4ubqKXYj5mK55s +apokhOqObEfY6D3p7YpdQO/IhBneCw9gKOxawYAPhCOrJC8JmE69I1Nk8BudA5gE +VnCPPQEIANUivsrR2jwb8C9wPONn0HS3YYCI/IVlLdw/Ia23ogBF1Uh8ORNg1G/t +0/6S7IKDZ2gGgWw25u9TjWRRWsxO9tjOPi2dYuhwlQRnq5Or+LzIEKRs9GnJMLFT +0kR9Nrhw4UyaN6tWkR9p1Py7ux8RLmDEMAs3fBfVMLMmQRerJ5SyCUiq/lm9aYTL +CC+vkjE01C+2oI0gcWGfLDjiJbaD4AazzibPfBe41BIk7WaCJyEcBqfrgW/Seof4 +3FhSKRGgc5nx3HH1DMz9AtYfKnVS5DgoBGpOhxgzIJN3/hFHPicRlYoHxLUE48Gc +FfQFEJ78RXeBuieXAkDxNczHnLkEPx8AEQEAAQAH+wRSCn0RCPP7+v/zLgDMG3Eq +QHs7C6dmmCnlS7j6Rnnr8HliL0QBy/yi3Q/Fia7RnBiDPT9k04SZdH3KmmUW2rEl +aSRCkv00PwkSUuuQ6l9lTNUQclnsnqSRlusVgLT3cNG9NJCwFgwFeLBQ2+ey0PZc +M78edlEDXNPc3CfvK8O7WK74YiNJqIQCs7aDJSv0s7O/asRQyMCsl/UYtMV6W03d +eauS3bM41ll7GVfHMgkChFUQHb+19JHzSq4yeqQ/vS30ASugFxB3Omomp95sRL/w +60y51faLyTKD4AN3FhDfeIEfh1ggN2UT70qzC3+F8TvxQQHEBhNQKlhWVbWTp+0E +ANkcyokvn+09OIO/YDxF3aB37gA3J37d6NXos6pdvqUPOVSHvmF/hiM0FO2To6vu +ex/WcDQafPm4eiW6oNllwtZhWU2tr34bZD4PIuktSX2Ax2v5QtZ4d1CVdDEwbYn/ +fmR+nif1fTKTljZragaI9Rt6NWhfh7UGt62iIKh0lbhLBAD7T5wHY8V1yqlnyByG +K7nt+IHnND2I7Hk58yxKjv2KUNYxWZeOAQTQmfQXjJ+BOmw6oHMmDmGvdjSxIE+9 +j9nezEONxzVVjEDTKBeEnUeDY1QGDyDyW1/AhLJ52yWGTNrmKcGV4KmaYnhDzq7Z +aqJVRcFMF9TAfhrEGGhRdD83/QQA6xAqjWiu6tbaDurVuce64mA1R3T7HJ81gEaX +I+eJNDJb7PK3dhFETgyc3mcepWFNJkoXqx2ADhG8jLqK4o/N/QlV5PQgeHmhz09V +Z7MNhivGxDKZoxX6Bouh+qs5OkatcGFhTz//+FHSfusV2emxNiwd4QIsizxaltqh +W1ke0bA7eYkBJQQYAQgADwUCVnCPPQIbDAUJEswDAAAKCRCPzH3vtO5amyRIB/9I +sWUWrvbHnjw2ZCiIV++lHgIntAcuQIbZIjyMXoM+imHsPrsDOUT65yi9Xp1pUyZE +KtGkZvP4p7HRzJL+RWiWEN7sgQfNqqev8BF2/xmxkmiSuXHJ0PSaC5DmLfFDyvvS +U6H5VPudNszKIHtyoS6ph6TH8GXzFmNiHaTOZKdmVxlyLj1/DN+d63M+ARZIe7gB +6jmP/gg4o52icfTcqLkkppYn8g1A9bdJ3k8qqExNPTf2llDscuD9VzpebFbPqfcY +qR71GfG7Kmg7qGnZtNac1ERvknI/fmmCQydGk5pOh0KUTgeLG2qB07cqCUBbOXaw +eNWbiM9EvtQLNMD9Gn7D +=/3u/ +-----END PGP PRIVATE KEY BLOCK----- +""" + +if __name__ == "__main__": + unittest.main() |