Add a server for events mechanism.
[leap_pycommon.git] / src / leap / common / events / daemon.py
1 # -*- coding: utf-8 -*-
2 # daemon.py
3 # Copyright (C) 2013 LEAP
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 """
19 A singleton daemon for running RPC services using protobuf.socketrpc.
20 """
21
22
23 import logging
24 import threading
25
26
27 from protobuf.socketrpc.server import (
28     SocketRpcServer,
29     ThreadedTCPServer,
30     SocketHandler,
31 )
32
33
34 logger = logging.getLogger(__name__)
35
36
37 class ServiceAlreadyRunningException(Exception):
38     """
39     Raised whenever a service is already running in this process but someone
40     attemped to start it in a different port.
41     """
42
43
44 class EventsRpcServer(SocketRpcServer):
45     """
46     RPC server used in server and component interfaces to receive messages.
47     """
48
49     def __init__(self, port, host='localhost'):
50         """
51         Initialize a RPC server.
52
53         @param port: the port in which to listen for incoming messages
54         @type port: int
55         @param host: the address to bind to
56         @type host: str
57         """
58         SocketRpcServer.__init__(self, port, host)
59         self._server = None
60
61     def run(self):
62         """
63         Run the server.
64         """
65         logger.info('Running server on port %d.' % self.port)
66         # parent implementation does not hold the server instance, so we do it
67         # here.
68         self._server = ThreadedTCPServer((self.host, self.port),
69                                         SocketHandler, self)
70         # if we chose to use a random port, fetch the port number info.
71         if self.port is 0:
72             self.port = self._server.socket.getsockname()[1]
73         self._server.serve_forever()
74
75     def stop(self):
76         """
77         Stop the server.
78         """
79         self._server.shutdown()
80
81
82 class EventsSingletonDaemon(threading.Thread):
83     """
84     Singleton class for for launching and terminating a daemon.
85
86     This class is used so every part of the mechanism that needs to listen for
87     messages can launch its own daemon (thread) to do the job.
88     """
89
90     # Singleton instance
91     __instance = None
92
93     def __new__(cls, *args, **kwargs):
94         """
95         Return a singleton instance if it exists or create and initialize one.
96         """
97         if len(args) is not 2:
98             raise TypeError("__init__() takes exactly 2 arguments (%d given)"
99                             % len(args))
100         if cls.__instance is None:
101             cls.__instance = object.__new__(
102                 EventsSingletonDaemon, *args, **kwargs)
103             cls.__initialize(cls.__instance, args[0], args[1])
104         return cls.__instance
105
106     @staticmethod
107     def __initialize(self, port, service):
108         """
109         Initialize a singleton daemon.
110
111         This is a static method disguised as instance method that actually
112         does the initialization of the daemon instance.
113
114         @param port: the port in which to listen for incoming messages
115         @type port: int
116         @param service: the service to provide in this daemon
117         @type service: google.protobuf.service.Service
118         """
119         threading.Thread.__init__(self)
120         self._port = port
121         self._service = service
122         self._server = EventsRpcServer(self._port)
123         self._server.registerService(self._service)
124         self.daemon = True
125
126     def __init__(self):
127         """
128         Singleton placeholder initialization method.
129
130         Initialization is made in __new__ so we can always return the same
131         instance upon object creation.
132         """
133         pass
134
135     @classmethod
136     def ensure(cls, port):
137         """
138         Make sure the daemon instance is running.
139
140         Each implementation of this method should call `self.ensure_service`
141         with the appropriate service from the `events.proto` definitions, and
142         return the daemon instance.
143
144         @param port: the port in which the daemon should be listening
145         @type port: int
146
147         @return: a daemon instance
148         @rtype: EventsSingletonDaemon
149         """
150         raise NotImplementedError(self.ensure)
151
152     @classmethod
153     def ensure_service(cls, port, service):
154         """
155         Start the singleton instance if not already running.
156
157         Might return ServiceAlreadyRunningException
158
159         @param port: the port in which the daemon should be listening
160         @type port: int
161
162         @return: a daemon instance
163         @rtype: EventsSingletonDaemon
164         """
165         daemon = cls(port, service)
166         if not daemon.is_alive():
167             daemon.start()
168         elif port and port != cls.__instance._port:
169             # service is running in this process but someone is trying to
170             # start it in another port
171             raise ServiceAlreadyRunningException(
172                 "Service is already running in this process on port %d."
173                 % self.__instance._port)
174         return daemon
175
176     @classmethod
177     def get_instance(cls):
178         """
179         Retrieve singleton instance of this daemon.
180
181         @return: a daemon instance
182         @rtype: EventsSingletonDaemon
183         """
184         return cls.__instance
185
186     def run(self):
187         """
188         Run the server.
189         """
190         self._server.run()
191
192     def stop(self):
193         """
194         Stop the daemon.
195         """
196         self._server.stop()
197
198     def get_port(self):
199         """
200         Retrieve the value of the port to which the service running in this
201         daemon is binded to.
202
203         @return: the port to which the daemon is binded to
204         @rtype: int
205         """
206         if self._port is 0:
207             self._port = self._server.port
208         return self._port