summaryrefslogtreecommitdiff
path: root/src/leap/common/events/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events/server.py')
-rw-r--r--src/leap/common/events/server.py242
1 files changed, 49 insertions, 193 deletions
diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py
index 41aede3..a69202e 100644
--- a/src/leap/common/events/server.py
+++ b/src/leap/common/events/server.py
@@ -14,223 +14,79 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A server for the events mechanism.
-
-A server can receive different kinds of requests from clients:
- 1. Registration request: store client port number to be notified when
- a specific signal arrives.
- 2. Signal request: redistribute the signal to registered clients.
"""
-import logging
-import socket
+The server for the events mechanism.
+"""
-from protobuf.socketrpc import RpcService
-from leap.common.events import (
- events_pb2 as proto,
- daemon,
-)
+import logging
+import txzmq
+from leap.common.zmq_utils import zmq_has_curve
-logger = logging.getLogger(__name__)
+from leap.common.events.zmq_components import TxZmqServerComponent
-SERVER_PORT = 8090
+if zmq_has_curve():
+ EMIT_ADDR = "tcp://127.0.0.1:9000"
+ REG_ADDR = "tcp://127.0.0.1:9001"
+else:
+ EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
+ REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
-# the `registered_clients` dictionary below should have the following
-# format:
-#
-# { event_signal: [ port, ... ], ... }
-#
-registered_clients = {}
-
-class PortAlreadyTaken(Exception):
- """
- Raised when trying to open a server in a port that is already taken.
- """
- pass
+logger = logging.getLogger(__name__)
-def ensure_server(port=SERVER_PORT):
+def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR):
"""
- Make sure the server is running on the given port.
+ Make sure the server is running in the given addresses.
- Attempt to connect to given local port. Upon success, assume that the
- events server has already been started. Upon failure, start events server.
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
- :param port: the port in which server should be listening
- :type port: int
-
- :return: the daemon instance or nothing
- :rtype: EventsServerDaemon or None
-
- :raise PortAlreadyTaken: Raised if C{port} is already taken by something
- that is not an events server.
- """
- try:
- # check if port is available
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- # port is taken, check if there's a server running there
- response = ping(port=port, timeout=1000)
- if response is not None and response.status == proto.EventResponse.OK:
- logger.info('A server is already running on port %d.', port)
- return
- # port is taken, and not by an events server
- logger.warning(
- 'Port %d is taken by something not an events server.', port)
- raise PortAlreadyTaken(port)
- except socket.error:
- # port is available, run a server
- logger.info('Launching server on port %d.', port)
- return EventsServerDaemon.ensure(port)
-
-
-def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
+ :return: an events server instance
+ :rtype: EventsServer
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsServerService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging server in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ _server = EventsServer(emit_addr, reg_addr)
+ return _server
-class EventsServerService(proto.EventsServerService):
+class EventsServer(TxZmqServerComponent):
"""
- Service for receiving events in clients.
+ An events server that listens for events in one address and publishes those
+ events in another address.
"""
- def register(self, controller, request, done):
- """
- Register a client port to be signaled when specific events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info("Received registration request: %s..." % str(request)[:40])
- # add client port to signal list
- if request.event not in registered_clients:
- registered_clients[request.event] = set([])
- registered_clients[request.event].add(request.port)
- # send response back to client
-
- logger.debug('sending response back')
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def unregister(self, controller, request, done):
- """
- Unregister a client port so it will not be signaled when specific
- events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info(
- "Received unregistration request: %s..." % str(request)[:40])
- # remove client port from signal list
- response = proto.EventResponse()
- if request.event in registered_clients:
- try:
- registered_clients[request.event].remove(request.port)
- response.status = proto.EventResponse.OK
- except KeyError:
- response.status = proto.EventsResponse.ERROR
- response.result = 'Port %d not registered.' % request.port
- # send response back to client
- logger.debug('sending response back')
- done.run(response)
-
- def signal(self, controller, request, done):
- """
- Perform an RPC call to signal all clients registered to receive a
- specific signal.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug('Received signal from client: %s...', str(request)[:40])
- # send signal to all registered clients
- # TODO: verify signal auth
- if request.event in registered_clients:
- for port in registered_clients[request.event]:
-
- def callback(req, resp):
- logger.debug("Signal received by " + str(port))
-
- service = RpcService(proto.EventsClientService_Stub,
- port, 'localhost')
- service.signal(request, callback=callback)
- # send response back to client
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def ping(self, controller, request, done):
+ def __init__(self, emit_addr, reg_addr):
"""
- Reply to a ping request.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
+ Initialize the events server.
-class EventsServerDaemon(daemon.EventsSingletonDaemon):
- """
- Singleton class for starting an events server daemon.
- """
-
- @classmethod
- def ensure(cls, port):
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
"""
- Make sure the daemon is running on the given port.
-
- :param port: the port in which the daemon should listen
- :type port: int
+ TxZmqServerComponent.__init__(self)
+ # bind PULL and PUB sockets
+ self._pull, self.pull_port = self._zmq_bind(
+ txzmq.ZmqPullConnection, emit_addr)
+ self._pub, self.pub_port = self._zmq_bind(
+ txzmq.ZmqPubConnection, reg_addr)
+ # set a handler for arriving messages
+ self._pull.onPull = self._onPull
+
+ def _onPull(self, message):
+ """
+ Callback executed when a message is pulled from a client.
- :return: a daemon instance
- :rtype: EventsServerDaemon
+ :param message: The message sent by the client.
+ :type message: str
"""
- return cls.ensure_service(port, EventsServerService())
+ event, content = message[0].split(b"\0", 1)
+ logger.debug("Publishing event: %s" % event)
+ self._pub.publish(content, tag=event)