Add missing import for socket and forward signal()s paramteres
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 A server for the events mechanism.
20
21 A server can receive different kinds of requests from components:
22
23   1. Registration request: store component port number to be notified when
24      a specific signal arrives.
25
26   2. Signal request: redistribute the signal to registered components.
27 """
28
29
30 import logging
31 import sets
32 import socket
33
34
35 from protobuf.socketrpc import RpcService
36 from leap.common.events import (
37     events_pb2 as proto,
38     daemon,
39 )
40
41
42 logger = logging.getLogger(__name__)
43
44
45 SERVER_PORT = 8090
46
47 # the `registered_components` dictionary below should have the following
48 # format:
49 #
50 #     { event_signal: [ port, ... ], ... }
51 #
52 registered_components = {}
53
54
55 def ensure_server(port=SERVER_PORT):
56     """
57     Make sure the server is running on the given port.
58
59     Attempt to connect to given local port. Upon success, assume that the
60     events server has already been started. Upon failure, start events server.
61
62     @param port: the port in which server should be listening
63     @type port: int
64
65     @return: the daemon instance or nothing
66     @rtype: EventsServerDaemon or None
67     """
68     try:
69         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
70         s.connect(('localhost', port))
71         s.close()
72         logger.info('Server is already running on port %d.', port)
73         return None
74     except socket.error:
75         logger.info('Launching server on port %d.', port)
76         return EventsServerDaemon.ensure(port)
77
78
79 class EventsServerService(proto.EventsServerService):
80     """
81     Service for receiving events in components.
82     """
83
84     def register(self, controller, request, done):
85         """
86         Register a component port to be signaled when specific events come in.
87
88         @param controller: used to mediate a single method call
89         @type controller: protobuf.socketrpc.controller.SocketRpcController
90         @param request: the request received from the component
91         @type request: leap.common.events.events_pb2.RegisterRequest
92         @param done: callback to be called when done
93         @type done: protobuf.socketrpc.server.Callback
94         """
95         logger.info("Received registration request: %s" % str(request))
96         # add component port to signal list
97         if request.event not in registered_components:
98             registered_components[request.event] = sets.Set()
99         registered_components[request.event].add(request.port)
100         # send response back to component
101         response = proto.EventResponse()
102         response.status = proto.EventResponse.OK
103         done.run(response)
104
105     def signal(self, controller, request, done):
106         """
107         Perform an RPC call to signal all components registered to receive a
108         specific signal.
109
110         @param controller: used to mediate a single method call
111         @type controller: protobuf.socketrpc.controller.SocketRpcController
112         @param request: the request received from the component
113         @type request: leap.common.events.events_pb2.SignalRequest
114         @param done: callback to be called when done
115         @type done: protobuf.socketrpc.server.Callback
116         """
117         logger.info('Received signal from component: %s', str(request))
118         # send signal to all registered components
119         # TODO: verify signal auth
120         if request.event in registered_components:
121             for port in registered_components[request.event]:
122
123                 def callback(req, resp):
124                     logger.info("Signal received by " + str(port))
125
126                 service = RpcService(proto.EventsComponentService_Stub,
127                                      port, 'localhost')
128                 service.signal(request, callback=callback)
129         # send response back to component
130         response = proto.EventResponse()
131         response.status = proto.EventResponse.OK
132         done.run(response)
133
134
135 class EventsServerDaemon(daemon.EventsSingletonDaemon):
136     """
137     Singleton class for starting an events server daemon.
138     """
139
140     @classmethod
141     def ensure(cls, port):
142         """
143         Make sure the daemon is running on the given port.
144
145         @param port: the port in which the daemon should listen
146         @type port: int
147
148         @return: a daemon instance
149         @rtype: EventsServerDaemon
150         """
151         return cls.ensure_service(port, EventsServerService())