summaryrefslogtreecommitdiff
path: root/src/leap/common/events/zmq_components.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events/zmq_components.py')
-rw-r--r--src/leap/common/events/zmq_components.py66
1 files changed, 24 insertions, 42 deletions
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
index 2c40f62..1e0d52a 100644
--- a/src/leap/common/events/zmq_components.py
+++ b/src/leap/common/events/zmq_components.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# zmq.py
-# Copyright (C) 2015 LEAP
+# Copyright (C) 2015, 2016 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,19 +14,16 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The server for the events mechanism.
"""
-
-
import os
import logging
import txzmq
import re
import time
+
from abc import ABCMeta
# XXX some distros don't package libsodium, so we have to be prepared for
@@ -37,8 +34,11 @@ try:
except ImportError:
pass
+from txzmq.connection import ZmqEndpoint, ZmqEndpointType
+
from leap.common.config import flags, get_path_prefix
from leap.common.zmq_utils import zmq_has_curve
+
from leap.common.zmq_utils import maybe_create_and_get_certificates
from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
@@ -53,6 +53,8 @@ class TxZmqComponent(object):
"""
A twisted-powered zmq events component.
"""
+ _factory = txzmq.ZmqFactory()
+ _factory.registerForShutdown()
__metaclass__ = ABCMeta
@@ -62,8 +64,6 @@ class TxZmqComponent(object):
"""
Initialize the txzmq component.
"""
- self._factory = txzmq.ZmqFactory()
- self._factory.registerForShutdown()
if path_prefix is None:
path_prefix = get_path_prefix(flags.STANDALONE)
self._config_prefix = os.path.join(path_prefix, "leap", "events")
@@ -93,21 +93,22 @@ class TxZmqComponent(object):
:return: The binded connection.
:rtype: txzmq.ZmqConnection
"""
+ endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)
connection = connClass(self._factory)
- # create and configure socket
- socket = connection.socket
- if zmq_has_curve():
+
+ if self.use_curve:
+ socket = connection.socket
public, secret = maybe_create_and_get_certificates(
self._config_prefix, self.component_type)
server_public_file = os.path.join(
self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+
server_public, _ = zmq.auth.load_certificate(server_public_file)
socket.curve_publickey = public
socket.curve_secretkey = secret
socket.curve_serverkey = server_public
- socket.connect(address)
- logger.debug("Connected %s to %s." % (connClass, address))
- self._connections.append(connection)
+
+ connection.addEndpoints([endpoint])
return connection
def _zmq_bind(self, connClass, address):
@@ -122,33 +123,21 @@ class TxZmqComponent(object):
:return: The binded connection and port.
:rtype: (txzmq.ZmqConnection, int)
"""
+ proto, addr, port = ADDRESS_RE.search(address).groups()
+
+ endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
connection = connClass(self._factory)
- socket = connection.socket
- if zmq_has_curve():
+
+ if self.use_curve:
+ socket = connection.socket
+
public, secret = maybe_create_and_get_certificates(
self._config_prefix, self.component_type)
socket.curve_publickey = public
socket.curve_secretkey = secret
self._start_thread_auth(connection.socket)
- proto, addr, port = ADDRESS_RE.search(address).groups()
-
- if proto == "tcp":
- if port is None or port is '0':
- params = proto, addr
- port = socket.bind_to_random_port("%s://%s" % params)
- logger.debug("Binded %s to %s://%s." % ((connClass,) + params))
- else:
- params = proto, addr, int(port)
- socket.bind("%s://%s:%d" % params)
- logger.debug(
- "Binded %s to %s://%s:%d." % ((connClass,) + params))
- else:
- params = proto, addr
- socket.bind("%s://%s" % params)
- logger.debug(
- "Binded %s to %s://%s" % ((connClass,) + params))
- self._connections.append(connection)
+ connection.addEndpoints([endpoint])
return connection, port
def _start_thread_auth(self, socket):
@@ -158,6 +147,8 @@ class TxZmqComponent(object):
:param socket: The socket in which to configure the authenticator.
:type socket: zmq.Socket
"""
+ # TODO re-implement without threads.
+ logger.debug("Starting thread authenticator...")
authenticator = ThreadAuthenticator(self._factory.context)
# Temporary fix until we understand what the problem is
@@ -172,15 +163,6 @@ class TxZmqComponent(object):
authenticator.configure_curve(domain="*", location=public_keys_dir)
socket.curve_server = True # must come before bind
- def shutdown(self):
- """
- Shutdown the component.
- """
- logger.debug("Shutting down component %s." % str(self))
- for conn in self._connections:
- conn.shutdown()
- self._factory.shutdown()
-
class TxZmqServerComponent(TxZmqComponent):
"""