diff options
Diffstat (limited to 'src/leap/common/events')
| -rw-r--r-- | src/leap/common/events/server.py | 10 | ||||
| -rw-r--r-- | src/leap/common/events/txclient.py | 5 | ||||
| -rw-r--r-- | src/leap/common/events/zmq_components.py | 66 | 
3 files changed, 30 insertions, 51 deletions
| diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 7126723..30a0c44 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,31 +14,27 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  The server for the events mechanism.  """ - -  import logging  import platform -import txzmq  from leap.common.zmq_utils import zmq_has_curve  from leap.common.events.zmq_components import TxZmqServerComponent +import txzmq +  if zmq_has_curve() or platform.system() == "Windows": -    # Windows doesn't have icp sockets, we need to use always tcp +    # Windows doesn't have ipc sockets, we need to use always tcp      EMIT_ADDR = "tcp://127.0.0.1:9000"      REG_ADDR = "tcp://127.0.0.1:9001"  else:      EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"      REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" -  logger = logging.getLogger(__name__) diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index dfd0533..ca247ca 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -28,9 +28,10 @@ some other client.  import logging  import pickle +from leap.common.events.zmq_components import TxZmqClientComponent +  import txzmq -from leap.common.events.zmq_components import TxZmqClientComponent  from leap.common.events.client import EventsClient  from leap.common.events.client import configure_client  from leap.common.events.server import EMIT_ADDR @@ -68,6 +69,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):          # same client          self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)          self._sub.gotMessage = self._gotMessage +          self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)      def _gotMessage(self, msg, tag): @@ -122,7 +124,6 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):          callback(event, *content)      def shutdown(self): -        TxZmqClientComponent.shutdown(self)          EventsClient.shutdown(self) diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 2c40f62..1e0d52a 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -1,6 +1,6 @@  # -*- coding: utf-8 -*-  # zmq.py -# Copyright (C) 2015 LEAP +# Copyright (C) 2015, 2016 LEAP  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -14,19 +14,16 @@  #  # You should have received a copy of the GNU General Public License  # along with this program. If not, see <http://www.gnu.org/licenses/>. - -  """  The server for the events mechanism.  """ - -  import os  import logging  import txzmq  import re  import time +  from abc import ABCMeta  # XXX some distros don't package libsodium, so we have to be prepared for @@ -37,8 +34,11 @@ try:  except ImportError:      pass +from txzmq.connection import ZmqEndpoint, ZmqEndpointType +  from leap.common.config import flags, get_path_prefix  from leap.common.zmq_utils import zmq_has_curve +  from leap.common.zmq_utils import maybe_create_and_get_certificates  from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX @@ -53,6 +53,8 @@ class TxZmqComponent(object):      """      A twisted-powered zmq events component.      """ +    _factory = txzmq.ZmqFactory() +    _factory.registerForShutdown()      __metaclass__ = ABCMeta @@ -62,8 +64,6 @@ class TxZmqComponent(object):          """          Initialize the txzmq component.          """ -        self._factory = txzmq.ZmqFactory() -        self._factory.registerForShutdown()          if path_prefix is None:              path_prefix = get_path_prefix(flags.STANDALONE)          self._config_prefix = os.path.join(path_prefix, "leap", "events") @@ -93,21 +93,22 @@ class TxZmqComponent(object):          :return: The binded connection.          :rtype: txzmq.ZmqConnection          """ +        endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)          connection = connClass(self._factory) -        # create and configure socket -        socket = connection.socket -        if zmq_has_curve(): + +        if self.use_curve: +            socket = connection.socket              public, secret = maybe_create_and_get_certificates(                  self._config_prefix, self.component_type)              server_public_file = os.path.join(                  self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") +              server_public, _ = zmq.auth.load_certificate(server_public_file)              socket.curve_publickey = public              socket.curve_secretkey = secret              socket.curve_serverkey = server_public -        socket.connect(address) -        logger.debug("Connected %s to %s." % (connClass, address)) -        self._connections.append(connection) + +        connection.addEndpoints([endpoint])          return connection      def _zmq_bind(self, connClass, address): @@ -122,33 +123,21 @@ class TxZmqComponent(object):          :return: The binded connection and port.          :rtype: (txzmq.ZmqConnection, int)          """ +        proto, addr, port = ADDRESS_RE.search(address).groups() + +        endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)          connection = connClass(self._factory) -        socket = connection.socket -        if zmq_has_curve(): + +        if self.use_curve: +            socket = connection.socket +              public, secret = maybe_create_and_get_certificates(                  self._config_prefix, self.component_type)              socket.curve_publickey = public              socket.curve_secretkey = secret              self._start_thread_auth(connection.socket) -        proto, addr, port = ADDRESS_RE.search(address).groups() - -        if proto == "tcp": -            if port is None or port is '0': -                params = proto, addr -                port = socket.bind_to_random_port("%s://%s" % params) -                logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) -            else: -                params = proto, addr, int(port) -                socket.bind("%s://%s:%d" % params) -                logger.debug( -                    "Binded %s to %s://%s:%d." % ((connClass,) + params)) -        else: -            params = proto, addr -            socket.bind("%s://%s" % params) -            logger.debug( -                "Binded %s to %s://%s" % ((connClass,) + params)) -        self._connections.append(connection) +        connection.addEndpoints([endpoint])          return connection, port      def _start_thread_auth(self, socket): @@ -158,6 +147,8 @@ class TxZmqComponent(object):          :param socket: The socket in which to configure the authenticator.          :type socket: zmq.Socket          """ +        # TODO re-implement without threads. +        logger.debug("Starting thread authenticator...")          authenticator = ThreadAuthenticator(self._factory.context)          # Temporary fix until we understand what the problem is @@ -172,15 +163,6 @@ class TxZmqComponent(object):          authenticator.configure_curve(domain="*", location=public_keys_dir)          socket.curve_server = True  # must come before bind -    def shutdown(self): -        """ -        Shutdown the component. -        """ -        logger.debug("Shutting down component %s." % str(self)) -        for conn in self._connections: -            conn.shutdown() -        self._factory.shutdown() -  class TxZmqServerComponent(TxZmqComponent):      """ | 
