From 24977b744b42df912a23a2861453e7d4d5202310 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 23 Feb 2016 19:28:05 -0400 Subject: [feature] reactor-based authenticator We don't really need a thread to make use of the ZAP authenticator. Document bug fix after authenticator thread is gone --- changes/next-changelog.rst | 1 + src/leap/common/events/auth.py | 96 ++++++++++++++++++++++++++++++ src/leap/common/events/examples/README.txt | 49 +++++++++++++++ src/leap/common/events/examples/client.py | 2 + src/leap/common/events/examples/server.py | 4 ++ src/leap/common/events/server.py | 5 +- src/leap/common/events/txclient.py | 3 +- src/leap/common/events/zmq_components.py | 44 ++++++-------- 8 files changed, 173 insertions(+), 31 deletions(-) create mode 100644 src/leap/common/events/auth.py create mode 100644 src/leap/common/events/examples/README.txt create mode 100644 src/leap/common/events/examples/client.py create mode 100644 src/leap/common/events/examples/server.py diff --git a/changes/next-changelog.rst b/changes/next-changelog.rst index 9f0b455..a1af28c 100644 --- a/changes/next-changelog.rst +++ b/changes/next-changelog.rst @@ -15,6 +15,7 @@ Features Bugfixes ~~~~~~~~ +- `#7536 `_: zmq authenticator often hangs. - `#1235 `_: Description for the fixed stuff corresponding with issue #1235. - Bugfix without related issue number. diff --git a/src/leap/common/events/auth.py b/src/leap/common/events/auth.py new file mode 100644 index 0000000..1a1bcab --- /dev/null +++ b/src/leap/common/events/auth.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# auth.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +ZAP authentication, twisted style. +""" +from zmq import PAIR +from zmq.auth.base import Authenticator, VERSION +from txzmq.connection import ZmqConnection +from zmq.utils.strtypes import b, u + +from twisted.python import log + +from txzmq.connection import ZmqEndpoint, ZmqEndpointType + + +class TxAuthenticator(ZmqConnection): + + """ + This does not implement the whole ZAP protocol, but the bare minimum that + we need. + """ + + socketType = PAIR + address = 'inproc://zeromq.zap.01' + encoding = 'utf-8' + + def __init__(self, factory): + super(TxAuthenticator, self).__init__(factory) + self.authenticator = Authenticator(factory.context) + self.authenticator._send_zap_reply = self._send_zap_reply + + def start(self): + endpoint = ZmqEndpoint(ZmqEndpointType.bind, self.address) + self.addEndpoints([endpoint]) + + def messageReceived(self, msg): + + command = msg[0] + + if command == b'ALLOW': + addresses = [u(m, self.encoding) for m in msg[1:]] + try: + self.authenticator.allow(*addresses) + except Exception as e: + log.err("Failed to allow %s", addresses) + + elif command == b'CURVE': + domain = u(msg[1], self.encoding) + location = u(msg[2], self.encoding) + self.authenticator.configure_curve(domain, location) + + def _send_zap_reply(self, request_id, status_code, status_text, + user_id='user'): + """ + Send a ZAP reply to finish the authentication. + """ + user_id = user_id if status_code == b'200' else b'' + if isinstance(user_id, unicode): + user_id = user_id.encode(self.encoding, 'replace') + metadata = b'' # not currently used + reply = [VERSION, request_id, status_code, status_text, + user_id, metadata] + self.send(reply) + + +class TxAuthenticationRequest(ZmqConnection): + + socketType = PAIR + address = 'inproc://zeromq.zap.01' + encoding = 'utf-8' + + def start(self): + endpoint = ZmqEndpoint(ZmqEndpointType.connect, self.address) + self.addEndpoints([endpoint]) + + def allow(self, *addresses): + self.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses]) + + def configure_curve(self, domain='*', location=''): + domain = b(domain, self.encoding) + location = b(location, self.encoding) + self.send([b'CURVE', domain, location]) diff --git a/src/leap/common/events/examples/README.txt b/src/leap/common/events/examples/README.txt new file mode 100644 index 0000000..0bb0df6 --- /dev/null +++ b/src/leap/common/events/examples/README.txt @@ -0,0 +1,49 @@ +How to debug +----------------------------------------- +monitor the events socket: + sudo ngrep -W byline -d any port 9000 + +launch the server: + python server.py + +launch the client: + python client.py + +if zmq is available and enabled, you should see encrypted messages passing by +the socket. + +You should see something like the following: + +#### +T 127.0.0.1:9000 -> 127.0.0.1:33122 [AP] +.......... +## +T 127.0.0.1:33122 -> 127.0.0.1:9000 [AP] +........... +## +T 127.0.0.1:9000 -> 127.0.0.1:33122 [AP] +..CURVE............................................... +# +T 127.0.0.1:33122 -> 127.0.0.1:9000 [AP] +.CURVE............................................... +# +T 127.0.0.1:33122 -> 127.0.0.1:9000 [AP] +...HELLO.............................................................................:....^...".....'.S...n......Y...................O.7.+.D.q".*..R...j.....8..qu..~......Ck.G\....:...m....Tg.s..M..x<.. +## +T 127.0.0.1:9000 -> 127.0.0.1:33122 [AP] +...WELCOME..%.'.,Td... I..}...........`..Nm......./_.Je...4.....-.....f 127.0.0.1:9000 [AP] +..........INITIATE......!.*.=0.-......D..]{...A\.tz...!2.....A./ +6.......Y.h.N....cb.U.|..f..)....W..3..X.2U.3PGl.........m..95.(......NJ....5.'..W.GQ..B/.....\%.,Q..r.'L5.......{.W<=._.$.(6j.G... +...37.H..Th...'.........0 ........,..q....U..G..M.`!_..w....f.".......... +.d.K.Y.>f.n.kV. +# +T 127.0.0.1:9000 -> 127.0.0.1:33122 [AP] +.2.READY............A...e.)......*.8y....k.<.N1Z.4.. +# +T 127.0.0.1:33122 -> 127.0.0.1:9000 [AP] +.+.MESSAGE........o...*M..,.... +.r..w..[.GwcU +### + diff --git a/src/leap/common/events/examples/client.py b/src/leap/common/events/examples/client.py new file mode 100644 index 0000000..d6d8985 --- /dev/null +++ b/src/leap/common/events/examples/client.py @@ -0,0 +1,2 @@ +from leap.common.events.txclient import emit +emit('stuff!') diff --git a/src/leap/common/events/examples/server.py b/src/leap/common/events/examples/server.py new file mode 100644 index 0000000..f40f8dc --- /dev/null +++ b/src/leap/common/events/examples/server.py @@ -0,0 +1,4 @@ +from twisted.internet import reactor +from leap.common.events.server import ensure_server +reactor.callWhenRunning(ensure_server) +reactor.run() diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 30a0c44..6252853 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -20,12 +20,11 @@ The server for the events mechanism. import logging import platform -from leap.common.zmq_utils import zmq_has_curve +import txzmq +from leap.common.zmq_utils import zmq_has_curve from leap.common.events.zmq_components import TxZmqServerComponent -import txzmq - if zmq_has_curve() or platform.system() == "Windows": # Windows doesn't have ipc sockets, we need to use always tcp diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index ca247ca..a2b704d 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -28,10 +28,9 @@ some other client. import logging import pickle -from leap.common.events.zmq_components import TxZmqClientComponent - import txzmq +from leap.common.events.zmq_components import TxZmqClientComponent from leap.common.events.client import EventsClient from leap.common.events.client import configure_client from leap.common.events.server import EMIT_ADDR diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 1e0d52a..74abb76 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -21,16 +21,13 @@ import os import logging import txzmq import re -import time - from abc import ABCMeta -# XXX some distros don't package libsodium, so we have to be prepared for -# absence of zmq.auth try: import zmq.auth - from zmq.auth.thread import ThreadAuthenticator + from leap.common.events.auth import TxAuthenticator + from leap.common.events.auth import TxAuthenticationRequest except ImportError: pass @@ -38,16 +35,15 @@ from txzmq.connection import ZmqEndpoint, ZmqEndpointType from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve - from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX - logger = logging.getLogger(__name__) - ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$") +LOCALHOST_ALLOWED = '127.0.0.1' + class TxZmqComponent(object): """ @@ -55,6 +51,7 @@ class TxZmqComponent(object): """ _factory = txzmq.ZmqFactory() _factory.registerForShutdown() + _auth = None __metaclass__ = ABCMeta @@ -135,33 +132,28 @@ class TxZmqComponent(object): self._config_prefix, self.component_type) socket.curve_publickey = public socket.curve_secretkey = secret - self._start_thread_auth(connection.socket) + self._start_authentication(connection.socket) connection.addEndpoints([endpoint]) return connection, port - def _start_thread_auth(self, socket): - """ - Start the zmq curve thread authenticator. + def _start_authentication(self, socket): - :param socket: The socket in which to configure the authenticator. - :type socket: zmq.Socket - """ - # TODO re-implement without threads. - logger.debug("Starting thread authenticator...") - authenticator = ThreadAuthenticator(self._factory.context) + if not TxZmqComponent._auth: + TxZmqComponent._auth = TxAuthenticator(self._factory) + TxZmqComponent._auth.start() - # Temporary fix until we understand what the problem is - # See https://leap.se/code/issues/7536 - time.sleep(0.5) + auth_req = TxAuthenticationRequest(self._factory) + auth_req.start() + auth_req.allow(LOCALHOST_ALLOWED) - authenticator.start() - # XXX do not hardcode this here. - authenticator.allow('127.0.0.1') # tell authenticator to use the certificate in a directory public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) - authenticator.configure_curve(domain="*", location=public_keys_dir) - socket.curve_server = True # must come before bind + auth_req.configure_curve(domain="*", location=public_keys_dir) + + # This has to be set before binding the socket, that's why this method + # has to be called before addEndpoints() + socket.curve_server = True class TxZmqServerComponent(TxZmqComponent): -- cgit v1.2.3