summaryrefslogtreecommitdiff
path: root/src/leap/common/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events')
-rw-r--r--src/leap/common/events/server.py10
-rw-r--r--src/leap/common/events/txclient.py5
-rw-r--r--src/leap/common/events/zmq_components.py66
3 files changed, 30 insertions, 51 deletions
diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py
index 7126723..30a0c44 100644
--- a/src/leap/common/events/server.py
+++ b/src/leap/common/events/server.py
@@ -14,31 +14,27 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The server for the events mechanism.
"""
-
-
import logging
import platform
-import txzmq
from leap.common.zmq_utils import zmq_has_curve
from leap.common.events.zmq_components import TxZmqServerComponent
+import txzmq
+
if zmq_has_curve() or platform.system() == "Windows":
- # Windows doesn't have icp sockets, we need to use always tcp
+ # Windows doesn't have ipc sockets, we need to use always tcp
EMIT_ADDR = "tcp://127.0.0.1:9000"
REG_ADDR = "tcp://127.0.0.1:9001"
else:
EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
-
logger = logging.getLogger(__name__)
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
index dfd0533..ca247ca 100644
--- a/src/leap/common/events/txclient.py
+++ b/src/leap/common/events/txclient.py
@@ -28,9 +28,10 @@ some other client.
import logging
import pickle
+from leap.common.events.zmq_components import TxZmqClientComponent
+
import txzmq
-from leap.common.events.zmq_components import TxZmqClientComponent
from leap.common.events.client import EventsClient
from leap.common.events.client import configure_client
from leap.common.events.server import EMIT_ADDR
@@ -68,6 +69,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
# same client
self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
self._sub.gotMessage = self._gotMessage
+
self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
def _gotMessage(self, msg, tag):
@@ -122,7 +124,6 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
callback(event, *content)
def shutdown(self):
- TxZmqClientComponent.shutdown(self)
EventsClient.shutdown(self)
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
index 2c40f62..1e0d52a 100644
--- a/src/leap/common/events/zmq_components.py
+++ b/src/leap/common/events/zmq_components.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# zmq.py
-# Copyright (C) 2015 LEAP
+# Copyright (C) 2015, 2016 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,19 +14,16 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The server for the events mechanism.
"""
-
-
import os
import logging
import txzmq
import re
import time
+
from abc import ABCMeta
# XXX some distros don't package libsodium, so we have to be prepared for
@@ -37,8 +34,11 @@ try:
except ImportError:
pass
+from txzmq.connection import ZmqEndpoint, ZmqEndpointType
+
from leap.common.config import flags, get_path_prefix
from leap.common.zmq_utils import zmq_has_curve
+
from leap.common.zmq_utils import maybe_create_and_get_certificates
from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
@@ -53,6 +53,8 @@ class TxZmqComponent(object):
"""
A twisted-powered zmq events component.
"""
+ _factory = txzmq.ZmqFactory()
+ _factory.registerForShutdown()
__metaclass__ = ABCMeta
@@ -62,8 +64,6 @@ class TxZmqComponent(object):
"""
Initialize the txzmq component.
"""
- self._factory = txzmq.ZmqFactory()
- self._factory.registerForShutdown()
if path_prefix is None:
path_prefix = get_path_prefix(flags.STANDALONE)
self._config_prefix = os.path.join(path_prefix, "leap", "events")
@@ -93,21 +93,22 @@ class TxZmqComponent(object):
:return: The binded connection.
:rtype: txzmq.ZmqConnection
"""
+ endpoint = ZmqEndpoint(ZmqEndpointType.connect, address)
connection = connClass(self._factory)
- # create and configure socket
- socket = connection.socket
- if zmq_has_curve():
+
+ if self.use_curve:
+ socket = connection.socket
public, secret = maybe_create_and_get_certificates(
self._config_prefix, self.component_type)
server_public_file = os.path.join(
self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+
server_public, _ = zmq.auth.load_certificate(server_public_file)
socket.curve_publickey = public
socket.curve_secretkey = secret
socket.curve_serverkey = server_public
- socket.connect(address)
- logger.debug("Connected %s to %s." % (connClass, address))
- self._connections.append(connection)
+
+ connection.addEndpoints([endpoint])
return connection
def _zmq_bind(self, connClass, address):
@@ -122,33 +123,21 @@ class TxZmqComponent(object):
:return: The binded connection and port.
:rtype: (txzmq.ZmqConnection, int)
"""
+ proto, addr, port = ADDRESS_RE.search(address).groups()
+
+ endpoint = ZmqEndpoint(ZmqEndpointType.bind, address)
connection = connClass(self._factory)
- socket = connection.socket
- if zmq_has_curve():
+
+ if self.use_curve:
+ socket = connection.socket
+
public, secret = maybe_create_and_get_certificates(
self._config_prefix, self.component_type)
socket.curve_publickey = public
socket.curve_secretkey = secret
self._start_thread_auth(connection.socket)
- proto, addr, port = ADDRESS_RE.search(address).groups()
-
- if proto == "tcp":
- if port is None or port is '0':
- params = proto, addr
- port = socket.bind_to_random_port("%s://%s" % params)
- logger.debug("Binded %s to %s://%s." % ((connClass,) + params))
- else:
- params = proto, addr, int(port)
- socket.bind("%s://%s:%d" % params)
- logger.debug(
- "Binded %s to %s://%s:%d." % ((connClass,) + params))
- else:
- params = proto, addr
- socket.bind("%s://%s" % params)
- logger.debug(
- "Binded %s to %s://%s" % ((connClass,) + params))
- self._connections.append(connection)
+ connection.addEndpoints([endpoint])
return connection, port
def _start_thread_auth(self, socket):
@@ -158,6 +147,8 @@ class TxZmqComponent(object):
:param socket: The socket in which to configure the authenticator.
:type socket: zmq.Socket
"""
+ # TODO re-implement without threads.
+ logger.debug("Starting thread authenticator...")
authenticator = ThreadAuthenticator(self._factory.context)
# Temporary fix until we understand what the problem is
@@ -172,15 +163,6 @@ class TxZmqComponent(object):
authenticator.configure_curve(domain="*", location=public_keys_dir)
socket.curve_server = True # must come before bind
- def shutdown(self):
- """
- Shutdown the component.
- """
- logger.debug("Shutting down component %s." % str(self))
- for conn in self._connections:
- conn.shutdown()
- self._factory.shutdown()
-
class TxZmqServerComponent(TxZmqComponent):
"""