[bug] avoid the events server to block twistd daemon
authorKali Kaneko <kali@leap.se>
Mon, 22 Feb 2016 23:26:45 +0000 (19:26 -0400)
committerKali Kaneko <kali@leap.se>
Tue, 23 Feb 2016 23:32:20 +0000 (19:32 -0400)
1. refactor the zmq_connect/bind methods to use the txzmq addEndpoints
mechanism, which cleans up the code a bit. it uses the underlying
bindOrConnect method.

2. wrap the addEndpoints call in a helper function that ensures that
doRead is called afterward.

I'm not fully comfortable with us still using the AuthenticatorThread, I
believe we could go witha txzmq-based authenticator for curve.

src/leap/common/events/server.py
src/leap/common/events/txclient.py
src/leap/common/events/zmq_components.py

index 7126723..30a0c44 100644 (file)
 #
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
 """
 The server for the events mechanism.
 """
-
-
 import logging
 import platform
-import txzmq
 
 from leap.common.zmq_utils import zmq_has_curve
 
 from leap.common.events.zmq_components import TxZmqServerComponent
 
+import txzmq
+
 
 if zmq_has_curve() or platform.system() == "Windows":
-    # Windows doesn't have icp sockets, we need to use always tcp
+    # Windows doesn't have ipc sockets, we need to use always tcp
     EMIT_ADDR = "tcp://127.0.0.1:9000"
     REG_ADDR = "tcp://127.0.0.1:9001"
 else:
     EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
     REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
 
-
 logger = logging.getLogger(__name__)
 
 
index dfd0533..ca247ca 100644 (file)
@@ -28,9 +28,10 @@ some other client.
 import logging
 import pickle
 
+from leap.common.events.zmq_components import TxZmqClientComponent
+
 import txzmq
 
-from leap.common.events.zmq_components import TxZmqClientComponent
 from leap.common.events.client import EventsClient
 from leap.common.events.client import configure_client
 from leap.common.events.server import EMIT_ADDR
@@ -68,6 +69,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
         # same client
         self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
         self._sub.gotMessage = self._gotMessage
+
         self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
 
     def _gotMessage(self, msg, tag):
@@ -122,7 +124,6 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
         callback(event, *content)
 
     def shutdown(self):
-        TxZmqClientComponent.shutdown(self)
         EventsClient.shutdown(self)
 
 
index 2c40f62..1e0d52a 100644 (file)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 # zmq.py
-# Copyright (C) 2015 LEAP
+# Copyright (C) 2015, 2016 LEAP
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
 """
 The server for the events mechanism.
 """
-
-
 import os
 import logging
 import txzmq
 import re
 import time
 
+
 from abc import ABCMeta
 
 # XXX some distros don't package libsodium, so we have to be prepared for
@@ -37,8 +34,11 @@ try:
 except ImportError:
     pass
 
+from txzmq.connection import ZmqEndpoint, ZmqEndpointType
+
 from leap.common.config import flags, get_path_prefix
 from leap.common.zmq_utils import zmq_has_curve
+
 from leap.common.zmq_utils import maybe_create_and_get_certificates
 from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
 
@@ -53,6 +53,8 @@ class TxZmqComponent(object):
     """
     A twisted-powered zmq events component.
     """
+    _factory = txzmq.ZmqFactory()
+    _factory.registerForShutdown()
 
     __metaclass__ = ABCMeta
 
@@ -62,8 +64,6 @@ class TxZmqComponent(object):
         """
         Initialize the txzmq component.
         """
-        self._factory = txzmq.ZmqFactory()
-        self._factory.registerForShutdown()
         if path_prefix is None:
             path_prefix = get_path_prefix(flags.STANDALONE)
         self._config_prefix = os.path.join(path_prefix, "leap", "events")
@@ -93,21 +93,22 @@ class TxZmqComponent(object):
         :return: The binded connection.
         :rtype: txzmq.ZmqConnection
         """
+        endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)
         connection = connClass(self._factory)
-        # create and configure socket
-        socket = connection.socket
-        if zmq_has_curve():
+
+        if self.use_curve:
+            socket = connection.socket
             public, secret = maybe_create_and_get_certificates(
                 self._config_prefix, self.component_type)
             server_public_file = os.path.join(
                 self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+
             server_public, _ = zmq.auth.load_certificate(server_public_file)
             socket.curve_publickey = public
             socket.curve_secretkey = secret
             socket.curve_serverkey = server_public
-        socket.connect(address)
-        logger.debug("Connected %s to %s." % (connClass, address))
-        self._connections.append(connection)
+
+        connection.addEndpoints([endpoint])
         return connection
 
     def _zmq_bind(self, connClass, address):
@@ -122,33 +123,21 @@ class TxZmqComponent(object):
         :return: The binded connection and port.
         :rtype: (txzmq.ZmqConnection, int)
         """
+        proto, addr, port = ADDRESS_RE.search(address).groups()
+
+        endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
         connection = connClass(self._factory)
-        socket = connection.socket
-        if zmq_has_curve():
+
+        if self.use_curve:
+            socket = connection.socket
+
             public, secret = maybe_create_and_get_certificates(
                 self._config_prefix, self.component_type)
             socket.curve_publickey = public
             socket.curve_secretkey = secret
             self._start_thread_auth(connection.socket)
 
-        proto, addr, port = ADDRESS_RE.search(address).groups()
-
-        if proto == "tcp":
-            if port is None or port is '0':
-                params = proto, addr
-                port = socket.bind_to_random_port("%s://%s" % params)
-                logger.debug("Binded %s to %s://%s." % ((connClass,) + params))
-            else:
-                params = proto, addr, int(port)
-                socket.bind("%s://%s:%d" % params)
-                logger.debug(
-                    "Binded %s to %s://%s:%d." % ((connClass,) + params))
-        else:
-            params = proto, addr
-            socket.bind("%s://%s" % params)
-            logger.debug(
-                "Binded %s to %s://%s" % ((connClass,) + params))
-        self._connections.append(connection)
+        connection.addEndpoints([endpoint])
         return connection, port
 
     def _start_thread_auth(self, socket):
@@ -158,6 +147,8 @@ class TxZmqComponent(object):
         :param socket: The socket in which to configure the authenticator.
         :type socket: zmq.Socket
         """
+        # TODO re-implement without threads.
+        logger.debug("Starting thread authenticator...")
         authenticator = ThreadAuthenticator(self._factory.context)
 
         # Temporary fix until we understand what the problem is
@@ -172,15 +163,6 @@ class TxZmqComponent(object):
         authenticator.configure_curve(domain="*", location=public_keys_dir)
         socket.curve_server = True  # must come before bind
 
-    def shutdown(self):
-        """
-        Shutdown the component.
-        """
-        logger.debug("Shutting down component %s." % str(self))
-        for conn in self._connections:
-            conn.shutdown()
-        self._factory.shutdown()
-
 
 class TxZmqServerComponent(TxZmqComponent):
     """