Merge remote-tracking branch 'drebs/feature/events-signals' into develop
[leap_pycommon.git] / src / leap / common / events / server.py
1 # -*- coding: utf-8 -*-
2 # server.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 A server for the events mechanism.
20
21 A server can receive different kinds of requests from components:
22
23   1. Registration request: store component port number to be notified when
24      a specific signal arrives.
25
26   2. Signal request: redistribute the signal to registered components.
27 """
28
29
30 import logging
31 import sets
32
33
34 from protobuf.socketrpc import RpcService
35 from leap.common.events import (
36     events_pb2 as proto,
37     daemon,
38 )
39
40
41 logger = logging.getLogger(__name__)
42
43
44 SERVER_PORT = 8090
45
46 # the `registered_components` dictionary below should have the following
47 # format:
48 #
49 #     { event_signal: [ port, ... ], ... }
50 #
51 registered_components = {}
52
53
54 def ensure_server(port=SERVER_PORT):
55     """
56     Make sure the server is running on the given port.
57
58     Attempt to connect to given local port. Upon success, assume that the
59     events server has already been started. Upon failure, start events server.
60
61     @param port: the port in which server should be listening
62     @type port: int
63
64     @return: the daemon instance or nothing
65     @rtype: EventsServerDaemon or None
66     """
67     try:
68         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
69         s.connect(('localhost', port))
70         s.close()
71         logger.info('Server is already running on port %d.', port)
72         return None
73     except socket.error:
74         logger.info('Launching server on port %d.', port)
75         return EventsServerDaemon.ensure(port)
76
77
78 class EventsServerService(proto.EventsServerService):
79     """
80     Service for receiving events in components.
81     """
82
83     def register(self, controller, request, done):
84         """
85         Register a component port to be signaled when specific events come in.
86
87         @param controller: used to mediate a single method call
88         @type controller: protobuf.socketrpc.controller.SocketRpcController
89         @param request: the request received from the component
90         @type request: leap.common.events.events_pb2.RegisterRequest
91         @param done: callback to be called when done
92         @type done: protobuf.socketrpc.server.Callback
93         """
94         logger.info("Received registration request: %s" % str(request))
95         # add component port to signal list
96         if request.event not in registered_components:
97             registered_components[request.event] = sets.Set()
98         registered_components[request.event].add(request.port)
99         # send response back to component
100         response = proto.EventResponse()
101         response.status = proto.EventResponse.OK
102         done.run(response)
103
104     def signal(self, controller, request, done):
105         """
106         Perform an RPC call to signal all components registered to receive a
107         specific signal.
108
109         @param controller: used to mediate a single method call
110         @type controller: protobuf.socketrpc.controller.SocketRpcController
111         @param request: the request received from the component
112         @type request: leap.common.events.events_pb2.SignalRequest
113         @param done: callback to be called when done
114         @type done: protobuf.socketrpc.server.Callback
115         """
116         logger.info('Received signal from component: %s', str(request))
117         # send signal to all registered components
118         # TODO: verify signal auth
119         if request.event in registered_components:
120             for port in registered_components[request.event]:
121
122                 def callback(req, resp):
123                     logger.info("Signal received by " + str(port))
124
125                 service = RpcService(proto.EventsComponentService_Stub,
126                                      port, 'localhost')
127                 service.signal(request, callback=callback)
128         # send response back to component
129         response = proto.EventResponse()
130         response.status = proto.EventResponse.OK
131         done.run(response)
132
133
134 class EventsServerDaemon(daemon.EventsSingletonDaemon):
135     """
136     Singleton class for starting an events server daemon.
137     """
138
139     @classmethod
140     def ensure(cls, port):
141         """
142         Make sure the daemon is running on the given port.
143
144         @param port: the port in which the daemon should listen
145         @type port: int
146
147         @return: a daemon instance
148         @rtype: EventsServerDaemon
149         """
150         return cls.ensure_service(port, EventsServerService())