From 88941164243ce1ac6f30c790120165c04ea4a041 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 22 Feb 2016 19:25:21 -0400 Subject: [feature] optional flag to disable curve authentication --- src/leap/common/events/zmq_components.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/leap/common/events/zmq_components.py') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 51de02c..2c40f62 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -58,7 +58,7 @@ class TxZmqComponent(object): _component_type = None - def __init__(self, path_prefix=None): + def __init__(self, path_prefix=None, enable_curve=True): """ Initialize the txzmq component. """ @@ -68,6 +68,10 @@ class TxZmqComponent(object): path_prefix = get_path_prefix(flags.STANDALONE) self._config_prefix = os.path.join(path_prefix, "leap", "events") self._connections = [] + if enable_curve: + self.use_curve = zmq_has_curve() + else: + self.use_curve = False @property def component_type(self): -- cgit v1.2.3 From b940cfc29b88374ce57b101a39bc012bb903f6e8 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 22 Feb 2016 19:26:45 -0400 Subject: [bug] avoid the events server to block twistd daemon 1. refactor the zmq_connect/bind methods to use the txzmq addEndpoints mechanism, which cleans up the code a bit. it uses the underlying bindOrConnect method. 2. wrap the addEndpoints call in a helper function that ensures that doRead is called afterward. I'm not fully comfortable with us still using the AuthenticatorThread, I believe we could go witha txzmq-based authenticator for curve. --- src/leap/common/events/zmq_components.py | 66 ++++++++++++-------------------- 1 file changed, 24 insertions(+), 42 deletions(-) (limited to 'src/leap/common/events/zmq_components.py') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 2c40f62..1e0d52a 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # zmq.py -# Copyright (C) 2015 LEAP +# Copyright (C) 2015, 2016 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,19 +14,16 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ The server for the events mechanism. """ - - import os import logging import txzmq import re import time + from abc import ABCMeta # XXX some distros don't package libsodium, so we have to be prepared for @@ -37,8 +34,11 @@ try: except ImportError: pass +from txzmq.connection import ZmqEndpoint, ZmqEndpointType + from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve + from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX @@ -53,6 +53,8 @@ class TxZmqComponent(object): """ A twisted-powered zmq events component. """ + _factory = txzmq.ZmqFactory() + _factory.registerForShutdown() __metaclass__ = ABCMeta @@ -62,8 +64,6 @@ class TxZmqComponent(object): """ Initialize the txzmq component. """ - self._factory = txzmq.ZmqFactory() - self._factory.registerForShutdown() if path_prefix is None: path_prefix = get_path_prefix(flags.STANDALONE) self._config_prefix = os.path.join(path_prefix, "leap", "events") @@ -93,21 +93,22 @@ class TxZmqComponent(object): :return: The binded connection. :rtype: txzmq.ZmqConnection """ + endpoint = ZmqEndpoint(ZmqEndpointType.connect, address) connection = connClass(self._factory) - # create and configure socket - socket = connection.socket - if zmq_has_curve(): + + if self.use_curve: + socket = connection.socket public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) server_public_file = os.path.join( self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) socket.curve_publickey = public socket.curve_secretkey = secret socket.curve_serverkey = server_public - socket.connect(address) - logger.debug("Connected %s to %s." % (connClass, address)) - self._connections.append(connection) + + connection.addEndpoints([endpoint]) return connection def _zmq_bind(self, connClass, address): @@ -122,33 +123,21 @@ class TxZmqComponent(object): :return: The binded connection and port. :rtype: (txzmq.ZmqConnection, int) """ + proto, addr, port = ADDRESS_RE.search(address).groups() + + endpoint = ZmqEndpoint(ZmqEndpointType.bind, address) connection = connClass(self._factory) - socket = connection.socket - if zmq_has_curve(): + + if self.use_curve: + socket = connection.socket + public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) socket.curve_publickey = public socket.curve_secretkey = secret self._start_thread_auth(connection.socket) - proto, addr, port = ADDRESS_RE.search(address).groups() - - if proto == "tcp": - if port is None or port is '0': - params = proto, addr - port = socket.bind_to_random_port("%s://%s" % params) - logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) - else: - params = proto, addr, int(port) - socket.bind("%s://%s:%d" % params) - logger.debug( - "Binded %s to %s://%s:%d." % ((connClass,) + params)) - else: - params = proto, addr - socket.bind("%s://%s" % params) - logger.debug( - "Binded %s to %s://%s" % ((connClass,) + params)) - self._connections.append(connection) + connection.addEndpoints([endpoint]) return connection, port def _start_thread_auth(self, socket): @@ -158,6 +147,8 @@ class TxZmqComponent(object): :param socket: The socket in which to configure the authenticator. :type socket: zmq.Socket """ + # TODO re-implement without threads. + logger.debug("Starting thread authenticator...") authenticator = ThreadAuthenticator(self._factory.context) # Temporary fix until we understand what the problem is @@ -172,15 +163,6 @@ class TxZmqComponent(object): authenticator.configure_curve(domain="*", location=public_keys_dir) socket.curve_server = True # must come before bind - def shutdown(self): - """ - Shutdown the component. - """ - logger.debug("Shutting down component %s." % str(self)) - for conn in self._connections: - conn.shutdown() - self._factory.shutdown() - class TxZmqServerComponent(TxZmqComponent): """ -- cgit v1.2.3 From 24977b744b42df912a23a2861453e7d4d5202310 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Tue, 23 Feb 2016 19:28:05 -0400 Subject: [feature] reactor-based authenticator We don't really need a thread to make use of the ZAP authenticator. Document bug fix after authenticator thread is gone --- src/leap/common/events/zmq_components.py | 44 +++++++++++++------------------- 1 file changed, 18 insertions(+), 26 deletions(-) (limited to 'src/leap/common/events/zmq_components.py') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 1e0d52a..74abb76 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -21,16 +21,13 @@ import os import logging import txzmq import re -import time - from abc import ABCMeta -# XXX some distros don't package libsodium, so we have to be prepared for -# absence of zmq.auth try: import zmq.auth - from zmq.auth.thread import ThreadAuthenticator + from leap.common.events.auth import TxAuthenticator + from leap.common.events.auth import TxAuthenticationRequest except ImportError: pass @@ -38,16 +35,15 @@ from txzmq.connection import ZmqEndpoint, ZmqEndpointType from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve - from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX - logger = logging.getLogger(__name__) - ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$") +LOCALHOST_ALLOWED = '127.0.0.1' + class TxZmqComponent(object): """ @@ -55,6 +51,7 @@ class TxZmqComponent(object): """ _factory = txzmq.ZmqFactory() _factory.registerForShutdown() + _auth = None __metaclass__ = ABCMeta @@ -135,33 +132,28 @@ class TxZmqComponent(object): self._config_prefix, self.component_type) socket.curve_publickey = public socket.curve_secretkey = secret - self._start_thread_auth(connection.socket) + self._start_authentication(connection.socket) connection.addEndpoints([endpoint]) return connection, port - def _start_thread_auth(self, socket): - """ - Start the zmq curve thread authenticator. + def _start_authentication(self, socket): - :param socket: The socket in which to configure the authenticator. - :type socket: zmq.Socket - """ - # TODO re-implement without threads. - logger.debug("Starting thread authenticator...") - authenticator = ThreadAuthenticator(self._factory.context) + if not TxZmqComponent._auth: + TxZmqComponent._auth = TxAuthenticator(self._factory) + TxZmqComponent._auth.start() - # Temporary fix until we understand what the problem is - # See https://leap.se/code/issues/7536 - time.sleep(0.5) + auth_req = TxAuthenticationRequest(self._factory) + auth_req.start() + auth_req.allow(LOCALHOST_ALLOWED) - authenticator.start() - # XXX do not hardcode this here. - authenticator.allow('127.0.0.1') # tell authenticator to use the certificate in a directory public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) - authenticator.configure_curve(domain="*", location=public_keys_dir) - socket.curve_server = True # must come before bind + auth_req.configure_curve(domain="*", location=public_keys_dir) + + # This has to be set before binding the socket, that's why this method + # has to be called before addEndpoints() + socket.curve_server = True class TxZmqServerComponent(TxZmqComponent): -- cgit v1.2.3 From 027ad7eed50947608738ce0009fccf776936e55c Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 29 Feb 2016 19:33:28 -0400 Subject: [tests] adapt events tests to recent changes --- src/leap/common/events/zmq_components.py | 58 ++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 25 deletions(-) (limited to 'src/leap/common/events/zmq_components.py') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 74abb76..8919cd9 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -57,12 +57,14 @@ class TxZmqComponent(object): _component_type = None - def __init__(self, path_prefix=None, enable_curve=True): + def __init__(self, path_prefix=None, enable_curve=True, factory=None): """ Initialize the txzmq component. """ if path_prefix is None: path_prefix = get_path_prefix(flags.STANDALONE) + if factory is not None: + self._factory = factory self._config_prefix = os.path.join(path_prefix, "leap", "events") self._connections = [] if enable_curve: @@ -78,64 +80,69 @@ class TxZmqComponent(object): "define a self._component_type!") return self._component_type - def _zmq_connect(self, connClass, address): + def _zmq_bind(self, connClass, address): """ - Connect to an address. + Bind to an address. :param connClass: The connection class to be used. :type connClass: txzmq.ZmqConnection - :param address: The address to connect to. + :param address: The address to bind to. :type address: str - :return: The binded connection. - :rtype: txzmq.ZmqConnection + :return: The binded connection and port. + :rtype: (txzmq.ZmqConnection, int) """ - endpoint = ZmqEndpoint(ZmqEndpointType.connect, address) + proto, addr, port = ADDRESS_RE.search(address).groups() + + endpoint = ZmqEndpoint(ZmqEndpointType.bind, address) connection = connClass(self._factory) if self.use_curve: socket = connection.socket + public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) - server_public_file = os.path.join( - self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") - - server_public, _ = zmq.auth.load_certificate(server_public_file) socket.curve_publickey = public socket.curve_secretkey = secret - socket.curve_serverkey = server_public + self._start_authentication(connection.socket) - connection.addEndpoints([endpoint]) - return connection + if proto == 'tcp' and int(port) == 0: + connection.endpoints.extend([endpoint]) + port = connection.socket.bind_to_random_port('tcp://%s' % addr) + else: + connection.addEndpoints([endpoint]) - def _zmq_bind(self, connClass, address): + return connection, int(port) + + def _zmq_connect(self, connClass, address): """ - Bind to an address. + Connect to an address. :param connClass: The connection class to be used. :type connClass: txzmq.ZmqConnection - :param address: The address to bind to. + :param address: The address to connect to. :type address: str - :return: The binded connection and port. - :rtype: (txzmq.ZmqConnection, int) + :return: The binded connection. + :rtype: txzmq.ZmqConnection """ - proto, addr, port = ADDRESS_RE.search(address).groups() - - endpoint = ZmqEndpoint(ZmqEndpointType.bind, address) + endpoint = ZmqEndpoint(ZmqEndpointType.connect, address) connection = connClass(self._factory) if self.use_curve: socket = connection.socket - public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + + server_public, _ = zmq.auth.load_certificate(server_public_file) socket.curve_publickey = public socket.curve_secretkey = secret - self._start_authentication(connection.socket) + socket.curve_serverkey = server_public connection.addEndpoints([endpoint]) - return connection, port + return connection def _start_authentication(self, socket): @@ -150,6 +157,7 @@ class TxZmqComponent(object): # tell authenticator to use the certificate in a directory public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) auth_req.configure_curve(domain="*", location=public_keys_dir) + auth_req.shutdown() # This has to be set before binding the socket, that's why this method # has to be called before addEndpoints() -- cgit v1.2.3 From 3a317f04bfa55698a7064ea3d5c5a1b4cc5ead36 Mon Sep 17 00:00:00 2001 From: Christoph Kluenter Date: Wed, 16 Mar 2016 17:03:52 +0100 Subject: [bug] close TxAuthenticator properly otherwise the context.term() does not return --- src/leap/common/events/zmq_components.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src/leap/common/events/zmq_components.py') diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 8919cd9..c533a74 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -158,6 +158,7 @@ class TxZmqComponent(object): public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) auth_req.configure_curve(domain="*", location=public_keys_dir) auth_req.shutdown() + TxZmqComponent._auth.shutdown() # This has to be set before binding the socket, that's why this method # has to be called before addEndpoints() -- cgit v1.2.3