1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2013 LEAP
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 The server for the events mechanism.
25 from leap.common.zmq_utils import zmq_has_curve
26 from leap.common.events.zmq_components import TxZmqServerComponent
29 if zmq_has_curve() or platform.system() == "Windows":
30 # Windows doesn't have ipc sockets, we need to use always tcp
31 EMIT_ADDR = "tcp://127.0.0.1:9000"
32 REG_ADDR = "tcp://127.0.0.1:9001"
34 EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
35 REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
37 logger = logging.getLogger(__name__)
40 def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR):
42 Make sure the server is running in the given addresses.
44 :param emit_addr: The address in which to receive events from clients.
46 :param reg_addr: The address to which publish events to clients.
49 :return: an events server instance
52 _server = EventsServer(emit_addr, reg_addr)
56 class EventsServer(TxZmqServerComponent):
58 An events server that listens for events in one address and publishes those
59 events in another address.
62 def __init__(self, emit_addr, reg_addr):
64 Initialize the events server.
66 :param emit_addr: The address in which to receive events from clients.
68 :param reg_addr: The address to which publish events to clients.
71 TxZmqServerComponent.__init__(self)
72 # bind PULL and PUB sockets
73 self._pull, self.pull_port = self._zmq_bind(
74 txzmq.ZmqPullConnection, emit_addr)
75 self._pub, self.pub_port = self._zmq_bind(
76 txzmq.ZmqPubConnection, reg_addr)
77 # set a handler for arriving messages
78 self._pull.onPull = self._onPull
80 def _onPull(self, message):
82 Callback executed when a message is pulled from a client.
84 :param message: The message sent by the client.
87 event, content = message[0].split(b"\0", 1)
88 logger.debug("Publishing event: %s" % event)
89 self._pub.publish(content, tag=event)