From b940cfc29b88374ce57b101a39bc012bb903f6e8 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 22 Feb 2016 19:26:45 -0400 Subject: [bug] avoid the events server to block twistd daemon 1. refactor the zmq_connect/bind methods to use the txzmq addEndpoints mechanism, which cleans up the code a bit. it uses the underlying bindOrConnect method. 2. wrap the addEndpoints call in a helper function that ensures that doRead is called afterward. I'm not fully comfortable with us still using the AuthenticatorThread, I believe we could go witha txzmq-based authenticator for curve. --- src/leap/common/events/server.py | 10 ++--- src/leap/common/events/txclient.py | 5 ++- src/leap/common/events/zmq_components.py | 66 ++++++++++++-------------------- 3 files changed, 30 insertions(+), 51 deletions(-) (limited to 'src/leap/common/events') diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 7126723..30a0c44 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,31 +14,27 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ The server for the events mechanism. """ - - import logging import platform -import txzmq from leap.common.zmq_utils import zmq_has_curve from leap.common.events.zmq_components import TxZmqServerComponent +import txzmq + if zmq_has_curve() or platform.system() == "Windows": - # Windows doesn't have icp sockets, we need to use always tcp + # Windows doesn't have ipc sockets, we need to use always tcp EMIT_ADDR = "tcp://127.0.0.1:9000" REG_ADDR = "tcp://127.0.0.1:9001" else: EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0" REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" - logger = logging.getLogger(__name__) diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index dfd0533..ca247ca 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -28,9 +28,10 @@ some other client. import logging import pickle +from leap.common.events.zmq_components import TxZmqClientComponent + import txzmq -from leap.common.events.zmq_components import TxZmqClientComponent from leap.common.events.client import EventsClient from leap.common.events.client import configure_client from leap.common.events.server import EMIT_ADDR @@ -68,6 +69,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): # same client self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr) self._sub.gotMessage = self._gotMessage + self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr) def _gotMessage(self, msg, tag): @@ -122,7 +124,6 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): callback(event, *content) def shutdown(self): - TxZmqClientComponent.shutdown(self) EventsClient.shutdown(self) diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 2c40f62..1e0d52a 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # zmq.py -# Copyright (C) 2015 LEAP +# Copyright (C) 2015, 2016 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,19 +14,16 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - - """ The server for the events mechanism. """ - - import os import logging import txzmq import re import time + from abc import ABCMeta # XXX some distros don't package libsodium, so we have to be prepared for @@ -37,8 +34,11 @@ try: except ImportError: pass +from txzmq.connection import ZmqEndpoint, ZmqEndpointType + from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve + from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX @@ -53,6 +53,8 @@ class TxZmqComponent(object): """ A twisted-powered zmq events component. """ + _factory = txzmq.ZmqFactory() + _factory.registerForShutdown() __metaclass__ = ABCMeta @@ -62,8 +64,6 @@ class TxZmqComponent(object): """ Initialize the txzmq component. """ - self._factory = txzmq.ZmqFactory() - self._factory.registerForShutdown() if path_prefix is None: path_prefix = get_path_prefix(flags.STANDALONE) self._config_prefix = os.path.join(path_prefix, "leap", "events") @@ -93,21 +93,22 @@ class TxZmqComponent(object): :return: The binded connection. :rtype: txzmq.ZmqConnection """ + endpoint = ZmqEndpoint(ZmqEndpointType.connect, address) connection = connClass(self._factory) - # create and configure socket - socket = connection.socket - if zmq_has_curve(): + + if self.use_curve: + socket = connection.socket public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) server_public_file = os.path.join( self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) socket.curve_publickey = public socket.curve_secretkey = secret socket.curve_serverkey = server_public - socket.connect(address) - logger.debug("Connected %s to %s." % (connClass, address)) - self._connections.append(connection) + + connection.addEndpoints([endpoint]) return connection def _zmq_bind(self, connClass, address): @@ -122,33 +123,21 @@ class TxZmqComponent(object): :return: The binded connection and port. :rtype: (txzmq.ZmqConnection, int) """ + proto, addr, port = ADDRESS_RE.search(address).groups() + + endpoint = ZmqEndpoint(ZmqEndpointType.bind, address) connection = connClass(self._factory) - socket = connection.socket - if zmq_has_curve(): + + if self.use_curve: + socket = connection.socket + public, secret = maybe_create_and_get_certificates( self._config_prefix, self.component_type) socket.curve_publickey = public socket.curve_secretkey = secret self._start_thread_auth(connection.socket) - proto, addr, port = ADDRESS_RE.search(address).groups() - - if proto == "tcp": - if port is None or port is '0': - params = proto, addr - port = socket.bind_to_random_port("%s://%s" % params) - logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) - else: - params = proto, addr, int(port) - socket.bind("%s://%s:%d" % params) - logger.debug( - "Binded %s to %s://%s:%d." % ((connClass,) + params)) - else: - params = proto, addr - socket.bind("%s://%s" % params) - logger.debug( - "Binded %s to %s://%s" % ((connClass,) + params)) - self._connections.append(connection) + connection.addEndpoints([endpoint]) return connection, port def _start_thread_auth(self, socket): @@ -158,6 +147,8 @@ class TxZmqComponent(object): :param socket: The socket in which to configure the authenticator. :type socket: zmq.Socket """ + # TODO re-implement without threads. + logger.debug("Starting thread authenticator...") authenticator = ThreadAuthenticator(self._factory.context) # Temporary fix until we understand what the problem is @@ -172,15 +163,6 @@ class TxZmqComponent(object): authenticator.configure_curve(domain="*", location=public_keys_dir) socket.curve_server = True # must come before bind - def shutdown(self): - """ - Shutdown the component. - """ - logger.debug("Shutting down component %s." % str(self)) - for conn in self._connections: - conn.shutdown() - self._factory.shutdown() - class TxZmqServerComponent(TxZmqComponent): """ -- cgit v1.2.3