diff options
author | Tomás Touceda <chiiph@leap.se> | 2013-06-28 14:55:33 -0300 |
---|---|---|
committer | Tomás Touceda <chiiph@leap.se> | 2013-06-28 14:55:33 -0300 |
commit | d071efe20a285e579111f568e2ff994a6171249f (patch) | |
tree | 824ea959c02c8ed3af36c0cec8421e0748d311cc /src/leap/common/events | |
parent | b966eec96fd3621908baab8697de8e2ed61ba136 (diff) | |
parent | a0f78f9d708cc6fe686ee5860cdc34909391b14b (diff) |
Merge branch 'release-0.2.5'
Diffstat (limited to 'src/leap/common/events')
-rw-r--r-- | src/leap/common/events/Makefile | 3 | ||||
-rw-r--r-- | src/leap/common/events/README.rst | 53 | ||||
-rw-r--r-- | src/leap/common/events/__init__.py | 126 | ||||
-rw-r--r-- | src/leap/common/events/component.py | 290 | ||||
-rw-r--r-- | src/leap/common/events/daemon.py | 208 | ||||
-rw-r--r-- | src/leap/common/events/events.proto | 79 | ||||
-rw-r--r-- | src/leap/common/events/events_pb2.py | 444 | ||||
-rw-r--r-- | src/leap/common/events/mac_auth.py | 31 | ||||
-rw-r--r-- | src/leap/common/events/server.py | 176 | ||||
-rw-r--r-- | src/leap/common/events/service.py | 114 | ||||
-rw-r--r-- | src/leap/common/events/signal.proto | 57 | ||||
-rw-r--r-- | src/leap/common/events/signal_pb2.py | 250 | ||||
-rw-r--r-- | src/leap/common/events/test_events.py | 88 |
13 files changed, 1383 insertions, 536 deletions
diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile index 4f99f35..4f73dea 100644 --- a/src/leap/common/events/Makefile +++ b/src/leap/common/events/Makefile @@ -21,10 +21,11 @@ PROTOC = protoc -all: signal_pb2.py +all: events_pb2.py %_pb2.py: %.proto $(PROTOC) --python_out=./ $< + autopep8 --in-place --aggressive $@ clean: rm -f *_pb2.py diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst new file mode 100644 index 0000000..813be8b --- /dev/null +++ b/src/leap/common/events/README.rst @@ -0,0 +1,53 @@ +Events mechanism +================ + +The events mechanism allows for "components" to send signal events to each +other by means of a centralized server. Components can register with the +server to receive signals of certain types, and they can also send signals to +the server that will then redistribute these signals to registered components. + + +Listening daemons +----------------- + +Both components and the server listen for incoming messages by using a +listening daemon that runs in its own thread. The server daemon has to be +started explicitly, while components daemon will be started whenever a +component registers with the server to receive messages. + + +How to use it +------------- + +To start the events server: + +>>> from leap.common.events import server +>>> server.ensure_server(port=8090) + +To register a callback to be called when a given signal is raised: + +>>> from leap.common.events import ( +>>> register, +>>> events_pb2 as proto, +>>> ) +>>> +>>> def mycallback(sigreq): +>>> print str(sigreq) +>>> +>>> events.register(signal=proto.CLIENT_UID, callback=mycallback) + +To signal an event: + +>>> from leap.common.events import ( +>>> signal, +>>> events_pb2 as proto, +>>> ) +>>> signal(proto.CLIENT_UID) + +Adding events +------------- + +* Add the new event under enum ``Event`` in ``events.proto`` +* Compile the new protocolbuffers file:: + + make diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 27542a9..12416e4 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -15,37 +15,111 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +An events mechanism that allows for signaling of events between components. +""" + +import logging +import socket + + from leap.common.events import ( - signal_pb2, + events_pb2 as proto, + server, + component, + daemon, ) -# the `registered_callbacks` dictionary below should have the following -# format: -# -# { component: [ (uid, callback), ... ], ... } -# -registered_callbacks = {} +logger = logging.getLogger(__name__) + + +def register(signal, callback, uid=None, replace=False, reqcbk=None, + timeout=1000): + """ + Register a callback to be called when the given signal is received. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param callback: the callback to be called when the signal is received + :type callback: function + :param uid: a unique id for the callback + :type uid: int + :param replace: should an existent callback with same uid be replaced? + :type replace: bool + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None + """ + return component.register(signal, callback, uid, replace, reqcbk, timeout) + + +def unregister(signal, uid=None, reqcbk=None, timeout=1000): + """ + Unregister a callback. + + If C{uid} is specified, unregisters only the callback identified by that + unique id. Otherwise, unregisters all callbacks registered for C{signal}. + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param uid: a unique id for the callback + :type uid: int + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int -def register(signal, callback, uid=None, replace=False): + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None """ - Registers `callback` to be called when `signal` is signaled. + return component.unregister(signal, uid, reqcbk, timeout) + + +def signal(signal, content="", mac_method="", mac="", reqcbk=None, + timeout=1000): + """ + Send `signal` event to events server. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param content: the contents of the event signal + :type content: str + :param mac_method: the method used to auth mac + :type mac_method: str + :param mac: the content of the auth mac + :type mac: str + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None """ - if not registered_callbacks.has_key(signal): - registered_callbacks[signal] = [] - cbklist = registered_callbacks[signal] - if uid and filter(lambda (x,y): x == uid, cbklist): - # TODO: create appropriate exception - if not replace: - raise Exception("Callback already registered.") - else: - registered_callbacks[signal] = filter(lambda(x,y): x != uid, - cbklist) - registered_callbacks[signal].append((uid, callback)) - return uid - -#def get_registered_callbacks(): -# return registered_callbacks - -#__all__ = ['signal_pb2', 'service', 'register', 'registered_callbacks'] + return component.signal(signal, content, mac_method, mac, reqcbk, timeout) diff --git a/src/leap/common/events/component.py b/src/leap/common/events/component.py new file mode 100644 index 0000000..029d1ac --- /dev/null +++ b/src/leap/common/events/component.py @@ -0,0 +1,290 @@ +# -*- coding: utf-8 -*- +# component.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +The component end point of the events mechanism. + +Components are the communicating parties of the events mechanism. They +communicate by sending messages to a server, which in turn redistributes +messages to other components. + +When a component registers a callback for a given signal, it also tells the +server that it wants to be notified whenever signals of that type are sent by +some other component. +""" + + +import logging +import threading + + +from protobuf.socketrpc import RpcService +from leap.common.events import ( + events_pb2 as proto, + server, + daemon, + mac_auth, +) + + +logger = logging.getLogger(__name__) + + +# the `registered_callbacks` dictionary below should have the following +# format: +# +# { event_signal: [ (uid, callback), ... ], ... } +# +registered_callbacks = {} + + +class CallbackAlreadyRegistered(Exception): + """ + Raised when trying to register an already registered callback. + """ + pass + + +def ensure_component_daemon(): + """ + Ensure the component daemon is running and listening for incoming + messages. + + :return: the daemon instance + :rtype: EventsComponentDaemon + """ + import time + daemon = EventsComponentDaemon.ensure(0) + logger.debug('ensure component daemon') + + # Because we use a random port we want to wait until a port is assigned to + # local component daemon. + + while not (EventsComponentDaemon.get_instance() and + EventsComponentDaemon.get_instance().get_port()): + time.sleep(0.1) + return daemon + + +def register(signal, callback, uid=None, replace=False, reqcbk=None, + timeout=1000): + """ + Registers a callback to be called when a specific signal event is + received. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param callback: the callback to be called when the signal is received + :type callback: function + callback(leap.common.events.events_pb2.SignalRequest) + :param uid: a unique id for the callback + :type uid: int + :param replace: should an existent callback with same uid be replaced? + :type replace: bool + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + + Might raise a CallbackAlreadyRegistered exception if there's already a + callback identified by the given uid and replace is False. + + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None + """ + ensure_component_daemon() # so we can receive registered signals + # register callback locally + if signal not in registered_callbacks: + registered_callbacks[signal] = [] + cbklist = registered_callbacks[signal] + if uid and filter(lambda (x, y): x == uid, cbklist): + if not replace: + raise CallbackAlreadyRegisteredException() + else: + registered_callbacks[signal] = filter(lambda(x, y): x != uid, + cbklist) + registered_callbacks[signal].append((uid, callback)) + # register callback on server + request = proto.RegisterRequest() + request.event = signal + request.port = EventsComponentDaemon.get_instance().get_port() + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(proto.EventsServerService_Stub, + server.SERVER_PORT, 'localhost') + logger.info( + "Sending registration request to server on port %s: %s", + server.SERVER_PORT, + str(request)[:40]) + return service.register(request, callback=reqcbk, timeout=timeout) + +def unregister(signal, uid=None, reqcbk=None, timeout=1000): + """ + Unregister a callback. + + If C{uid} is specified, unregisters only the callback identified by that + unique id. Otherwise, unregisters all callbacks + + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param uid: a unique id for the callback + :type uid: int + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + + :return: the response from server for synch calls or nothing for asynch + calls or None if no callback is registered for that signal or uid. + :rtype: leap.common.events.events_pb2.EventsResponse or None + """ + if signal not in registered_callbacks or not registered_callbacks[signal]: + logger.warning("No callback registered for signal %d." % signal) + return None + # unregister callback locally + cbklist = registered_callbacks[signal] + if uid is not None: + if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []: + logger.warning("No callback registered for uid %d." % st) + return None + registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist) + else: + # exclude all callbacks for given signal + registered_callbacks[signal] = [] + # unregister port in server if there are no more callbacks for this signal + if not registered_callbacks[signal]: + request = proto.UnregisterRequest() + request.event = signal + request.port = EventsComponentDaemon.get_instance().get_port() + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(proto.EventsServerService_Stub, + server.SERVER_PORT, 'localhost') + logger.info( + "Sending unregistration request to server on port %s: %s", + server.SERVER_PORT, + str(request)[:40]) + return service.unregister(request, callback=reqcbk, timeout=timeout) + + +def signal(signal, content="", mac_method="", mac="", reqcbk=None, + timeout=1000): + """ + Send `signal` event to events server. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + :param signal: the signal that causes the callback to be launched + :type signal: int (see the `events.proto` file) + :param content: the contents of the event signal + :type content: str + :param mac_method: the method used for auth mac + :type mac_method: str + :param mac: the content of the auth mac + :type mac: str + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + + :return: the response from server for synch calls or nothing for asynch + calls. + :rtype: leap.common.events.events_pb2.EventsResponse or None + """ + request = proto.SignalRequest() + request.event = signal + request.content = content + request.mac_method = mac_method + request.mac = mac + service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, + 'localhost') + logger.info("Sending signal to server: %s", str(request)[:40]) + return service.signal(request, callback=reqcbk, timeout=timeout) + + +class EventsComponentService(proto.EventsComponentService): + """ + Service for receiving signal events in components. + """ + + def __init__(self): + proto.EventsComponentService.__init__(self) + + def signal(self, controller, request, done): + """ + Receive a signal and run callbacks registered for that signal. + + This method is called whenever a signal request is received from + server. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the component + :type request: leap.common.events.events_pb2.SignalRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info('Received signal from server: %s...' % str(request)[:40]) + + # run registered callbacks + # TODO: verify authentication using mac in incoming message + if request.event in registered_callbacks: + for (_, cbk) in registered_callbacks[request.event]: + # callbacks should be prepared to receive a + # events_pb2.SignalRequest. + cbk(request) + + # send response back to server + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + +class EventsComponentDaemon(daemon.EventsSingletonDaemon): + """ + A daemon that listens for incoming events from server. + """ + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon is running on the given port. + + :param port: the port in which the daemon should listen + :type port: int + + :return: a daemon instance + :rtype: EventsComponentDaemon + """ + return cls.ensure_service(port, EventsComponentService()) diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py new file mode 100644 index 0000000..c253948 --- /dev/null +++ b/src/leap/common/events/daemon.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# daemon.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +A singleton daemon for running RPC services using protobuf.socketrpc. +""" + + +import logging +import threading + + +from protobuf.socketrpc.server import ( + SocketRpcServer, + ThreadedTCPServer, + SocketHandler, +) + + +logger = logging.getLogger(__name__) + + +class ServiceAlreadyRunningException(Exception): + """ + Raised whenever a service is already running in this process but someone + attemped to start it in a different port. + """ + + +class EventsRpcServer(SocketRpcServer): + """ + RPC server used in server and component interfaces to receive messages. + """ + + def __init__(self, port, host='localhost'): + """ + Initialize a RPC server. + + :param port: the port in which to listen for incoming messages + :type port: int + :param host: the address to bind to + :type host: str + """ + SocketRpcServer.__init__(self, port, host) + self._server = None + + def run(self): + """ + Run the server. + """ + logger.info('Running server on port %d.' % self.port) + # parent implementation does not hold the server instance, so we do it + # here. + self._server = ThreadedTCPServer((self.host, self.port), + SocketHandler, self) + # if we chose to use a random port, fetch the port number info. + if self.port is 0: + self.port = self._server.socket.getsockname()[1] + self._server.serve_forever() + + def stop(self): + """ + Stop the server. + """ + self._server.shutdown() + + +class EventsSingletonDaemon(threading.Thread): + """ + Singleton class for for launching and terminating a daemon. + + This class is used so every part of the mechanism that needs to listen for + messages can launch its own daemon (thread) to do the job. + """ + + # Singleton instance + __instance = None + + def __new__(cls, *args, **kwargs): + """ + Return a singleton instance if it exists or create and initialize one. + """ + if len(args) is not 2: + raise TypeError("__init__() takes exactly 2 arguments (%d given)" + % len(args)) + if cls.__instance is None: + cls.__instance = object.__new__( + EventsSingletonDaemon) + cls.__initialize(cls.__instance, args[0], args[1]) + return cls.__instance + + @staticmethod + def __initialize(self, port, service): + """ + Initialize a singleton daemon. + + This is a static method disguised as instance method that actually + does the initialization of the daemon instance. + + :param port: the port in which to listen for incoming messages + :type port: int + :param service: the service to provide in this daemon + :type service: google.protobuf.service.Service + """ + threading.Thread.__init__(self) + self._port = port + self._service = service + self._server = EventsRpcServer(self._port) + self._server.registerService(self._service) + self.daemon = True + + def __init__(self): + """ + Singleton placeholder initialization method. + + Initialization is made in __new__ so we can always return the same + instance upon object creation. + """ + pass + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon instance is running. + + Each implementation of this method should call `self.ensure_service` + with the appropriate service from the `events.proto` definitions, and + return the daemon instance. + + :param port: the port in which the daemon should be listening + :type port: int + + :return: a daemon instance + :rtype: EventsSingletonDaemon + """ + raise NotImplementedError(self.ensure) + + @classmethod + def ensure_service(cls, port, service): + """ + Start the singleton instance if not already running. + + Might return ServiceAlreadyRunningException + + :param port: the port in which the daemon should be listening + :type port: int + + :return: a daemon instance + :rtype: EventsSingletonDaemon + """ + daemon = cls(port, service) + if not daemon.is_alive(): + daemon.start() + elif port and port != cls.__instance._port: + # service is running in this process but someone is trying to + # start it in another port + raise ServiceAlreadyRunningException( + "Service is already running in this process on port %d." + % self.__instance._port) + return daemon + + @classmethod + def get_instance(cls): + """ + Retrieve singleton instance of this daemon. + + :return: a daemon instance + :rtype: EventsSingletonDaemon + """ + return cls.__instance + + def run(self): + """ + Run the server. + """ + self._server.run() + + def stop(self): + """ + Stop the daemon. + """ + self._server.stop() + + def get_port(self): + """ + Retrieve the value of the port to which the service running in this + daemon is binded to. + + :return: the port to which the daemon is binded to + :rtype: int + """ + if self._port is 0: + self._port = self._server.port + return self._port diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto new file mode 100644 index 0000000..a813ed1 --- /dev/null +++ b/src/leap/common/events/events.proto @@ -0,0 +1,79 @@ +// signal.proto +// Copyright (C) 2013 LEAP +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package leap.common.events; +option py_generic_services = true; + +enum Event { + CLIENT_SESSION_ID = 1; + CLIENT_UID = 2; + SOLEDAD_CREATING_KEYS = 3; + SOLEDAD_DONE_CREATING_KEYS = 4; + SOLEDAD_UPLOADING_KEYS = 5; + SOLEDAD_DONE_UPLOADING_KEYS = 6; + SOLEDAD_DOWNLOADING_KEYS = 7; + SOLEDAD_DONE_DOWNLOADING_KEYS = 8; + SOLEDAD_NEW_DATA_TO_SYNC = 9; + SOLEDAD_DONE_DATA_SYNC = 10; + UPDATER_NEW_UPDATES = 11; + UPDATER_DONE_UPDATING = 12; + RAISE_WINDOW = 13; +} + +message SignalRequest { + required Event event = 1; + required string content = 2; + required string mac_method = 3; + required bytes mac = 4; + optional string enc_method = 5; + optional bool error_occurred = 6; +} + +message RegisterRequest { + required Event event = 1; + required int32 port = 2; + required string mac_method = 3; + required bytes mac = 4; +} + +message UnregisterRequest { + required Event event = 1; + required int32 port = 2; + required string mac_method = 3; + required bytes mac = 4; +} + +message EventResponse { + + enum Status { + OK = 1; + UNAUTH = 2; + ERROR = 3; + } + + required Status status = 1; + optional string result = 2; +} + +service EventsServerService { + rpc register(RegisterRequest) returns (EventResponse); + rpc unregister(UnregisterRequest) returns (EventResponse); + rpc signal(SignalRequest) returns (EventResponse); +} + +service EventsComponentService { + rpc signal(SignalRequest) returns (EventResponse); +} diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py new file mode 100644 index 0000000..5b1c118 --- /dev/null +++ b/src/leap/common/events/events_pb2.py @@ -0,0 +1,444 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: events.proto + +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import service as _service +from google.protobuf import service_reflection +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='events.proto', + package='leap.common.events', + serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\xe7\x02\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r2\x91\x02\n\x13\x45ventsServerService\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2h\n\x16\x45ventsComponentService\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') + +_EVENT = _descriptor.EnumDescriptor( + name='Event', + full_name='leap.common.events.Event', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='CLIENT_SESSION_ID', index=0, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='CLIENT_UID', index=1, number=2, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_CREATING_KEYS', index=2, number=3, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UPDATER_NEW_UPDATES', index=10, number=11, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UPDATER_DONE_UPDATING', index=11, number=12, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RAISE_WINDOW', index=12, number=13, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=542, + serialized_end=901, +) + +Event = enum_type_wrapper.EnumTypeWrapper(_EVENT) +CLIENT_SESSION_ID = 1 +CLIENT_UID = 2 +SOLEDAD_CREATING_KEYS = 3 +SOLEDAD_DONE_CREATING_KEYS = 4 +SOLEDAD_UPLOADING_KEYS = 5 +SOLEDAD_DONE_UPLOADING_KEYS = 6 +SOLEDAD_DOWNLOADING_KEYS = 7 +SOLEDAD_DONE_DOWNLOADING_KEYS = 8 +SOLEDAD_NEW_DATA_TO_SYNC = 9 +SOLEDAD_DONE_DATA_SYNC = 10 +UPDATER_NEW_UPDATES = 11 +UPDATER_DONE_UPDATING = 12 +RAISE_WINDOW = 13 + + +_EVENTRESPONSE_STATUS = _descriptor.EnumDescriptor( + name='Status', + full_name='leap.common.events.EventResponse.Status', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='OK', index=0, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UNAUTH', index=1, number=2, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='ERROR', index=2, number=3, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=500, + serialized_end=539, +) + + +_SIGNALREQUEST = _descriptor.Descriptor( + name='SignalRequest', + full_name='leap.common.events.SignalRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='event', full_name='leap.common.events.SignalRequest.event', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='content', full_name='leap.common.events.SignalRequest.content', index=1, + number=2, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2, + number=3, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac', full_name='leap.common.events.SignalRequest.mac', index=3, + number=4, type=12, cpp_type=9, label=2, + has_default_value=False, default_value="", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5, + number=6, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=37, + serialized_end=188, +) + + +_REGISTERREQUEST = _descriptor.Descriptor( + name='RegisterRequest', + full_name='leap.common.events.RegisterRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='event', full_name='leap.common.events.RegisterRequest.event', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='port', full_name='leap.common.events.RegisterRequest.port', index=1, + number=2, type=5, cpp_type=1, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2, + number=3, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3, + number=4, type=12, cpp_type=9, label=2, + has_default_value=False, default_value="", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=190, + serialized_end=296, +) + + +_UNREGISTERREQUEST = _descriptor.Descriptor( + name='UnregisterRequest', + full_name='leap.common.events.UnregisterRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='event', full_name='leap.common.events.UnregisterRequest.event', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='port', full_name='leap.common.events.UnregisterRequest.port', index=1, + number=2, type=5, cpp_type=1, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2, + number=3, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3, + number=4, type=12, cpp_type=9, label=2, + has_default_value=False, default_value="", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=298, + serialized_end=406, +) + + +_EVENTRESPONSE = _descriptor.Descriptor( + name='EventResponse', + full_name='leap.common.events.EventResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='status', full_name='leap.common.events.EventResponse.status', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='result', full_name='leap.common.events.EventResponse.result', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _EVENTRESPONSE_STATUS, + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=409, + serialized_end=539, +) + +_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT +_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT +_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT +_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS +_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE +DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST +DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST +DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST +DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE + + +class SignalRequest(_message.Message): + __metaclass__ = _reflection.GeneratedProtocolMessageType + DESCRIPTOR = _SIGNALREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) + + +class RegisterRequest(_message.Message): + __metaclass__ = _reflection.GeneratedProtocolMessageType + DESCRIPTOR = _REGISTERREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest) + + +class UnregisterRequest(_message.Message): + __metaclass__ = _reflection.GeneratedProtocolMessageType + DESCRIPTOR = _UNREGISTERREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest) + + +class EventResponse(_message.Message): + __metaclass__ = _reflection.GeneratedProtocolMessageType + DESCRIPTOR = _EVENTRESPONSE + + # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse) + + +DESCRIPTOR.has_options = True +DESCRIPTOR._options = _descriptor._ParseOptions( + descriptor_pb2.FileOptions(), '\220\001\001') + +_EVENTSSERVERSERVICE = _descriptor.ServiceDescriptor( + name='EventsServerService', + full_name='leap.common.events.EventsServerService', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=904, + serialized_end=1177, + methods=[ + _descriptor.MethodDescriptor( + name='register', + full_name='leap.common.events.EventsServerService.register', + index=0, + containing_service=None, + input_type=_REGISTERREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + _descriptor.MethodDescriptor( + name='unregister', + full_name='leap.common.events.EventsServerService.unregister', + index=1, + containing_service=None, + input_type=_UNREGISTERREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + _descriptor.MethodDescriptor( + name='signal', + full_name='leap.common.events.EventsServerService.signal', + index=2, + containing_service=None, + input_type=_SIGNALREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + ]) + + +class EventsServerService(_service.Service): + __metaclass__ = service_reflection.GeneratedServiceType + DESCRIPTOR = _EVENTSSERVERSERVICE + + +class EventsServerService_Stub(EventsServerService): + __metaclass__ = service_reflection.GeneratedServiceStubType + DESCRIPTOR = _EVENTSSERVERSERVICE + + +_EVENTSCOMPONENTSERVICE = _descriptor.ServiceDescriptor( + name='EventsComponentService', + full_name='leap.common.events.EventsComponentService', + file=DESCRIPTOR, + index=1, + options=None, + serialized_start=1179, + serialized_end=1283, + methods=[ + _descriptor.MethodDescriptor( + name='signal', + full_name='leap.common.events.EventsComponentService.signal', + index=0, + containing_service=None, + input_type=_SIGNALREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + ]) + + +class EventsComponentService(_service.Service): + __metaclass__ = service_reflection.GeneratedServiceType + DESCRIPTOR = _EVENTSCOMPONENTSERVICE + + +class EventsComponentService_Stub(EventsComponentService): + __metaclass__ = service_reflection.GeneratedServiceStubType + DESCRIPTOR = _EVENTSCOMPONENTSERVICE + +# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/mac_auth.py new file mode 100644 index 0000000..49d48f7 --- /dev/null +++ b/src/leap/common/events/mac_auth.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# mac_auth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Authentication system for events. + +This is not implemented yet. +""" + + +class MacMethod(object): + """ + Representation of possible MAC authentication methods. + """ + + MAC_NONE = 'none' + MAC_HMAC = 'hmac' diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py new file mode 100644 index 0000000..d53c218 --- /dev/null +++ b/src/leap/common/events/server.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A server for the events mechanism. + +A server can receive different kinds of requests from components: + + 1. Registration request: store component port number to be notified when + a specific signal arrives. + + 2. Signal request: redistribute the signal to registered components. +""" +import logging +import socket + + +from protobuf.socketrpc import RpcService +from leap.common.events import ( + events_pb2 as proto, + daemon, +) + + +logger = logging.getLogger(__name__) + + +SERVER_PORT = 8090 + +# the `registered_components` dictionary below should have the following +# format: +# +# { event_signal: [ port, ... ], ... } +# +registered_components = {} + + +def ensure_server(port=SERVER_PORT): + """ + Make sure the server is running on the given port. + + Attempt to connect to given local port. Upon success, assume that the + events server has already been started. Upon failure, start events server. + + :param port: the port in which server should be listening + :type port: int + + :return: the daemon instance or nothing + :rtype: EventsServerDaemon or None + """ + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('localhost', port)) + s.close() + logger.info('Server is already running on port %d.', port) + return None + except socket.error: + logger.info('Launching server on port %d.', port) + return EventsServerDaemon.ensure(port) + + +class EventsServerService(proto.EventsServerService): + """ + Service for receiving events in components. + """ + + def register(self, controller, request, done): + """ + Register a component port to be signaled when specific events come in. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the component + :type request: leap.common.events.events_pb2.RegisterRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info("Received registration request: %s..." % str(request)[:40]) + # add component port to signal list + if request.event not in registered_components: + registered_components[request.event] = set([]) + registered_components[request.event].add(request.port) + # send response back to component + + logger.debug('sending response back') + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + def unregister(self, controller, request, done): + """ + Unregister a component port so it will not be signaled when specific + events come in. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the component + :type request: leap.common.events.events_pb2.RegisterRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info( + "Received unregistration request: %s..." % str(request)[:40]) + # remove component port from signal list + response = proto.EventResponse() + if request.event in registered_components: + try: + registered_components[request.event].remove(request.port) + response.status = proto.EventResponse.OK + except KeyError: + response.status = proto.EventsResponse.ERROR + response.result = 'Port %d not registered.' % request.port + # send response back to component + logger.debug('sending response back') + done.run(response) + + def signal(self, controller, request, done): + """ + Perform an RPC call to signal all components registered to receive a + specific signal. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the component + :type request: leap.common.events.events_pb2.SignalRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info('Received signal from component: %s...', str(request)[:40]) + # send signal to all registered components + # TODO: verify signal auth + if request.event in registered_components: + for port in registered_components[request.event]: + + def callback(req, resp): + logger.info("Signal received by " + str(port)) + + service = RpcService(proto.EventsComponentService_Stub, + port, 'localhost') + service.signal(request, callback=callback) + # send response back to component + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + +class EventsServerDaemon(daemon.EventsSingletonDaemon): + """ + Singleton class for starting an events server daemon. + """ + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon is running on the given port. + + :param port: the port in which the daemon should listen + :type port: int + + :return: a daemon instance + :rtype: EventsServerDaemon + """ + return cls.ensure_service(port, EventsServerService()) diff --git a/src/leap/common/events/service.py b/src/leap/common/events/service.py deleted file mode 100644 index fda45b2..0000000 --- a/src/leap/common/events/service.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -# service.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import logging -import threading -from protobuf.socketrpc.server import ( - SocketRpcServer, - ThreadedTCPServer, - SocketHandler, -) -from leap.common.events import ( - signal_pb2 as proto, - registered_callbacks, -) - - -logger = logging.getLogger(__name__) - - -class SignalRpcServer(SocketRpcServer): - - def __init__(self, port, host='localhost'): - '''port - Port this server is started on''' - self.port = port - self.host = host - self.serviceMap = {} - self.server = None - - def run(self): - '''Activate the server.''' - logger.info('Running server on port %d' % self.port) - self.server = ThreadedTCPServer((self.host, self.port), - SocketHandler, self) - self.server.serve_forever() - - def stop(self): - self.server.shutdown() - - -class SignalService(proto.SignalService): - ''' - Handles signaling for LEAP components. - ''' - - def signal(self, controller, request, done): - logger.info('Received signal.') - - # Run registered callbacks - if registered_callbacks.has_key(request.signal): - for (_, cbk) in registered_callbacks[request.signal]: - cbk(request) - - # Create response message - response = proto.SignalResponse() - # TODO: change id for something meaningful - response.id = 1 - response.status = proto.SignalResponse.OK - - # Call provided callback with response message - done.run(response) - - -class SignalServiceThread(threading.Thread): - """ - Singleton class for starting a server thread - """ - - # Singleton instance - _instance = None - - def __init__(self, port): - super(SignalServiceThread, self).__init__() - self._service = SignalService() - self._port = port - self._server = SignalRpcServer(self._port) - self._server.registerService(self._service) - self.setDaemon(True) - - @staticmethod - def start_service(port): - """ - Start the singleton instance if not already running - Will not exit until the process ends - """ - if SignalServiceThread._instance == None: - SignalServiceThread._instance = SignalServiceThread(port) - SignalServiceThread._instance.start() - elif port != SignalServiceThread._instance._port: - # TODO: make this exception more self-explanatory - raise Exception() - return SignalServiceThread._instance - - def get_instance(self): - return self._instance - - def run(self): - self._server.run() - - def stop(self): - self._server.stop() diff --git a/src/leap/common/events/signal.proto b/src/leap/common/events/signal.proto deleted file mode 100644 index 336471c..0000000 --- a/src/leap/common/events/signal.proto +++ /dev/null @@ -1,57 +0,0 @@ -// signal.proto -// Copyright (C) 2013 LEA -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package leap.common.events; - -message SignalRequest { - - enum Signal { - CLIENT_SESSION_ID = 1; - CLIENT_UID = 2; - SOLEDAD_CREATING_KEYS = 3; - SOLEDAD_DONE_CREATING_KEYS = 4; - SOLEDAD_UPLOADING_KEYS = 5; - SOLEDAD_DONE_UPLOADING_KEYS = 6; - SOLEDAD_DOWNLOADING_KEYS = 7; - SOLEDAD_DONE_DOWNLOADING_KEYS = 8; - SOLEDAD_NEW_DATA_TO_SYNC = 9; - SOLEDAD_DONE_DATA_SYNC = 10; - } - - required int32 id = 1; - required Signal signal = 2; - required string content = 3; - required string mac_method = 4; - required bytes mac = 5; - optional string enc_method = 6; - optional bool error_occurred = 7; -} - -message SignalResponse { - - enum Status { - OK = 1; - UNAUTH = 2; - ERROR = 3; - } - - required int32 id = 1; - required Status status = 2; -} - -service SignalService { - rpc signal(SignalRequest) returns (SignalResponse); -} diff --git a/src/leap/common/events/signal_pb2.py b/src/leap/common/events/signal_pb2.py deleted file mode 100644 index b21676f..0000000 --- a/src/leap/common/events/signal_pb2.py +++ /dev/null @@ -1,250 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from google.protobuf import descriptor -from google.protobuf import message -from google.protobuf import reflection -from google.protobuf import service -from google.protobuf import service_reflection -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - -DESCRIPTOR = descriptor.FileDescriptor( - name='signal.proto', - package='leap.common.events', - serialized_pb='\n\x0csignal.proto\x12\x12leap.common.events\"\xd8\x03\n\rSignalRequest\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x38\n\x06signal\x18\x02 \x02(\x0e\x32(.leap.common.events.SignalRequest.Signal\x12\x0f\n\x07\x63ontent\x18\x03 \x02(\t\x12\x12\n\nmac_method\x18\x04 \x02(\t\x12\x0b\n\x03mac\x18\x05 \x02(\x0c\x12\x12\n\nenc_method\x18\x06 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x07 \x01(\x08\"\xa2\x02\n\x06Signal\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\"\x80\x01\n\x0eSignalResponse\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x39\n\x06status\x18\x02 \x02(\x0e\x32).leap.common.events.SignalResponse.Status\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x32`\n\rSignalService\x12O\n\x06signal\x12!.leap.common.events.SignalRequest\x1a\".leap.common.events.SignalResponse') - - - -_SIGNALREQUEST_SIGNAL = descriptor.EnumDescriptor( - name='Signal', - full_name='leap.common.events.SignalRequest.Signal', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='CLIENT_SESSION_ID', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CLIENT_UID', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_CREATING_KEYS', index=2, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=219, - serialized_end=509, -) - -_SIGNALRESPONSE_STATUS = descriptor.EnumDescriptor( - name='Status', - full_name='leap.common.events.SignalResponse.Status', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='OK', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UNAUTH', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='ERROR', index=2, number=3, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=601, - serialized_end=640, -) - - -_SIGNALREQUEST = descriptor.Descriptor( - name='SignalRequest', - full_name='leap.common.events.SignalRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='id', full_name='leap.common.events.SignalRequest.id', index=0, - number=1, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='signal', full_name='leap.common.events.SignalRequest.signal', index=1, - number=2, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='content', full_name='leap.common.events.SignalRequest.content', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=3, - number=4, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.SignalRequest.mac', index=4, - number=5, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=5, - number=6, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=6, - number=7, type=8, cpp_type=7, label=1, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _SIGNALREQUEST_SIGNAL, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=37, - serialized_end=509, -) - - -_SIGNALRESPONSE = descriptor.Descriptor( - name='SignalResponse', - full_name='leap.common.events.SignalResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='id', full_name='leap.common.events.SignalResponse.id', index=0, - number=1, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='status', full_name='leap.common.events.SignalResponse.status', index=1, - number=2, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _SIGNALRESPONSE_STATUS, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=512, - serialized_end=640, -) - - -_SIGNALREQUEST.fields_by_name['signal'].enum_type = _SIGNALREQUEST_SIGNAL -_SIGNALREQUEST_SIGNAL.containing_type = _SIGNALREQUEST; -_SIGNALRESPONSE.fields_by_name['status'].enum_type = _SIGNALRESPONSE_STATUS -_SIGNALRESPONSE_STATUS.containing_type = _SIGNALRESPONSE; - -class SignalRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _SIGNALREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) - -class SignalResponse(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _SIGNALRESPONSE - - # @@protoc_insertion_point(class_scope:leap.common.events.SignalResponse) - - -_SIGNALSERVICE = descriptor.ServiceDescriptor( - name='SignalService', - full_name='leap.common.events.SignalService', - file=DESCRIPTOR, - index=0, - options=None, - serialized_start=642, - serialized_end=738, - methods=[ - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.SignalService.signal', - index=0, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_SIGNALRESPONSE, - options=None, - ), -]) - -class SignalService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _SIGNALSERVICE -class SignalService_Stub(SignalService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _SIGNALSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/test_events.py b/src/leap/common/events/test_events.py deleted file mode 100644 index ae55319..0000000 --- a/src/leap/common/events/test_events.py +++ /dev/null @@ -1,88 +0,0 @@ -import unittest -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import service -from leap.common.events.signal_pb2 import ( - SignalRequest, - SignalService, - SignalService_Stub, -) - - -port = 8090 - -class EventsTestCase(unittest.TestCase): - - def _start_service(self): - return service.SignalServiceThread.start_service(port) - - def setUp(self): - super(EventsTestCase, self).setUp() - self._service = self._start_service() - - def tearDown(self): - events.registered_callbacks = {} - super(EventsTestCase, self).tearDown() - - def test_service_singleton(self): - self.assertTrue(self._service.get_instance() == self._service, - "Can't get singleton class for service.") - - def test_register_signal(self): - key = SignalRequest.SOLEDAD_CREATING_KEYS - self.assertEqual({}, events.registered_callbacks, - 'There should be no registered_callbacks events when ' - 'service has just started.') - events.register(key, lambda x: True) - self.assertEqual(1, len(events.registered_callbacks), - 'Wrong number of registered callbacks.') - self.assertEqual(events.registered_callbacks.keys(), [key], - 'Couldn\'t locate registered signal.') - events.register(key, lambda x: True) - self.assertEqual(1, len(events.registered_callbacks), - 'Wrong number of registered callbacks.') - self.assertEqual(events.registered_callbacks.keys(), [key], - 'Couldn\'t locate registered signal.') - self.assertEqual( - 2, - len(events.registered_callbacks[SignalRequest.SOLEDAD_CREATING_KEYS]), - 'Wrong number of registered callbacks.') - key2 = SignalRequest.CLIENT_UID - events.register(key2, lambda x: True) - self.assertEqual(2, len(events.registered_callbacks), - 'Wrong number of registered callbacks.') - self.assertEqual( - sorted(events.registered_callbacks.keys()), - sorted([key2, key]), - 'Wrong keys in `registered_keys`.') - - def test_register_signal_replace(self): - key = SignalRequest.SOLEDAD_CREATING_KEYS - cbk = lambda x: True - self.assertEqual({}, events.registered_callbacks, - 'There should be no registered_callbacks events when ' - 'service has just started.') - events.register(key, cbk, uid=1) - self.assertRaises(Exception, events.register, key, lambda x: True, uid=1) - self.assertEquals(1, - events.register(key, lambda x: True, uid=1, replace=True), - "Could not replace callback.") - self.assertEqual(1, len(events.registered_callbacks), - 'Wrong number of registered callbacks.') - self.assertEqual(events.registered_callbacks.keys(), [key], - 'Couldn\'t locate registered signal.') - - def test_signal_response_status(self): - sig = SignalRequest.SOLEDAD_CREATING_KEYS - cbk = lambda x: True - events.register(sig, cbk) - request = SignalRequest() - request.id = 1 - request.signal = sig - request.content = 'my signal contents' - request.mac_method = 'nomac' - request.mac = "" - service = RpcService(SignalService_Stub, port, 'localhost') - response = service.signal(request, timeout=1000) - self.assertEqual(response.OK, response.status, - 'Wrong response status.') |