From b7c74e4f293d0e611ea038e04022fbe700a8cb42 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 24 Jul 2013 16:26:52 -0300 Subject: Trying to init events server raises when given port is not free. * Also fix and improve some tests. --- src/leap/common/events/__init__.py | 31 +++++++++++++++ src/leap/common/events/client.py | 38 ++++++++++++++++++ src/leap/common/events/events.proto | 34 ++++++++++++++++ src/leap/common/events/events_pb2.py | 77 +++++++++++++++++++++++++++++------- src/leap/common/events/server.py | 60 +++++++++++++++++++++++++++- 5 files changed, 223 insertions(+), 17 deletions(-) (limited to 'src/leap/common/events') diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index d498340..388ee17 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -123,3 +123,34 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None, :rtype: leap.common.events.events_pb2.EventsResponse or None """ return client.signal(signal, content, mac_method, mac, reqcbk, timeout) + +def ping_client(port, reqcbk=None, timeout=1000): + """ + Ping a client running in C{port}. + + :param port: the port in which the client should be listening + :type port: int + :param reqcbk: a callback to be called when a response from client is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + """ + return client.ping(port, reqcbk=reqcbk, timeout=timeout) + + +def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000): + """ + Ping the server. + + :param port: the port in which server should be listening + :type port: int + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + """ + return server.ping(port, reqcbk=reqcbk, timeout=timeout) diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 17fc326..587b02a 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -233,6 +233,28 @@ def signal(signal, content="", mac_method="", mac="", reqcbk=None, return service.signal(request, callback=reqcbk, timeout=timeout) +def ping(port, reqcbk=None, timeout=1000): + """ + Ping a client running in C{port}. + + :param port: the port in which the client should be listening + :type port: int + :param reqcbk: a callback to be called when a response from client is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + """ + request = proto.PingRequest() + service = RpcService( + proto.EventsClientService_Stub, + port, + 'localhost') + logger.info("Pinging a client in port %d..." % port) + return service.ping(request, callback=reqcbk, timeout=timeout) + + class EventsClientService(proto.EventsClientService): """ Service for receiving signal events in clients. @@ -270,6 +292,22 @@ class EventsClientService(proto.EventsClientService): response.status = proto.EventResponse.OK done.run(response) + def ping(self, controller, request, done): + """ + Reply to a ping request. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the client + :type request: leap.common.events.events_pb2.RegisterRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info("Received ping request, sending response.") + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + class EventsClientDaemon(daemon.EventsSingletonDaemon): """ diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto index 79a5564..b844f42 100644 --- a/src/leap/common/events/events.proto +++ b/src/leap/common/events/events.proto @@ -17,6 +17,9 @@ package leap.common.events; option py_generic_services = true; + +// These are the events that can be signaled using the events mechanism. + enum Event { CLIENT_SESSION_ID = 1; CLIENT_UID = 2; @@ -33,6 +36,10 @@ enum Event { RAISE_WINDOW = 13; } + +// A SignalRequest is the type of the message sent from one component to request +// that a signal be sent to every registered component. + message SignalRequest { required Event event = 1; required string content = 2; @@ -42,6 +49,10 @@ message SignalRequest { optional bool error_occurred = 6; } + +// A RegisterRequest message tells the server that a component wants to +// be signaled whenever a specific event occurs. + message RegisterRequest { required Event event = 1; required int32 port = 2; @@ -49,6 +60,10 @@ message RegisterRequest { required bytes mac = 4; } + +// An UnregisterRequest message tells the server that a component does not +// want to be signaled when a specific event occurs. + message UnregisterRequest { required Event event = 1; required int32 port = 2; @@ -56,6 +71,17 @@ message UnregisterRequest { required bytes mac = 4; } + +// A PingRequest message is used to find out if a server or component is +// alive. + +message PingRequest { +} + + +// The EventResponse is the message sent back by server and components after +// they receive other kinds of requests. + message EventResponse { enum Status { @@ -68,12 +94,20 @@ message EventResponse { optional string result = 2; } + +// The EventsServerService is the service provided by the server. + service EventsServerService { + rpc ping(PingRequest) returns (EventResponse); rpc register(RegisterRequest) returns (EventResponse); rpc unregister(UnregisterRequest) returns (EventResponse); rpc signal(SignalRequest) returns (EventResponse); } + +// EventsComponentService is the service provided by components (clients). + service EventsClientService { + rpc ping(PingRequest) returns (EventResponse); rpc signal(SignalRequest) returns (EventResponse); } diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py index 3b39950..274514c 100644 --- a/src/leap/common/events/events_pb2.py +++ b/src/leap/common/events/events_pb2.py @@ -14,7 +14,7 @@ from google.protobuf import descriptor_pb2 DESCRIPTOR = _descriptor.FileDescriptor( name='events.proto', package='leap.common.events', - serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\xe7\x02\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r2\x91\x02\n\x13\x45ventsServerService\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2e\n\x13\x45ventsClientService\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') + serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\xe7\x02\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r2\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') _EVENT = _descriptor.EnumDescriptor( name='Event', @@ -77,8 +77,8 @@ _EVENT = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=542, - serialized_end=901, + serialized_start=557, + serialized_end=916, ) Event = enum_type_wrapper.EnumTypeWrapper(_EVENT) @@ -118,8 +118,8 @@ _EVENTRESPONSE_STATUS = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=500, - serialized_end=539, + serialized_start=515, + serialized_end=554, ) @@ -284,6 +284,27 @@ _UNREGISTERREQUEST = _descriptor.Descriptor( ) +_PINGREQUEST = _descriptor.Descriptor( + name='PingRequest', + full_name='leap.common.events.PingRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=408, + serialized_end=421, +) + + _EVENTRESPONSE = _descriptor.Descriptor( name='EventResponse', full_name='leap.common.events.EventResponse', @@ -315,8 +336,8 @@ _EVENTRESPONSE = _descriptor.Descriptor( options=None, is_extendable=False, extension_ranges=[], - serialized_start=409, - serialized_end=539, + serialized_start=424, + serialized_end=554, ) _SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT @@ -327,6 +348,7 @@ _EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST +DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE @@ -351,6 +373,13 @@ class UnregisterRequest(_message.Message): # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest) +class PingRequest(_message.Message): + __metaclass__ = _reflection.GeneratedProtocolMessageType + DESCRIPTOR = _PINGREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest) + + class EventResponse(_message.Message): __metaclass__ = _reflection.GeneratedProtocolMessageType DESCRIPTOR = _EVENTRESPONSE @@ -368,13 +397,22 @@ _EVENTSSERVERSERVICE = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=904, - serialized_end=1177, + serialized_start=919, + serialized_end=1268, methods=[ + _descriptor.MethodDescriptor( + name='ping', + full_name='leap.common.events.EventsServerService.ping', + index=0, + containing_service=None, + input_type=_PINGREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), _descriptor.MethodDescriptor( name='register', full_name='leap.common.events.EventsServerService.register', - index=0, + index=1, containing_service=None, input_type=_REGISTERREQUEST, output_type=_EVENTRESPONSE, @@ -383,7 +421,7 @@ _EVENTSSERVERSERVICE = _descriptor.ServiceDescriptor( _descriptor.MethodDescriptor( name='unregister', full_name='leap.common.events.EventsServerService.unregister', - index=1, + index=2, containing_service=None, input_type=_UNREGISTERREQUEST, output_type=_EVENTRESPONSE, @@ -392,7 +430,7 @@ _EVENTSSERVERSERVICE = _descriptor.ServiceDescriptor( _descriptor.MethodDescriptor( name='signal', full_name='leap.common.events.EventsServerService.signal', - index=2, + index=3, containing_service=None, input_type=_SIGNALREQUEST, output_type=_EVENTRESPONSE, @@ -417,13 +455,22 @@ _EVENTSCLIENTSERVICE = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=1, options=None, - serialized_start=1179, - serialized_end=1280, + serialized_start=1271, + serialized_end=1448, methods=[ + _descriptor.MethodDescriptor( + name='ping', + full_name='leap.common.events.EventsClientService.ping', + index=0, + containing_service=None, + input_type=_PINGREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), _descriptor.MethodDescriptor( name='signal', full_name='leap.common.events.EventsClientService.signal', - index=0, + index=1, containing_service=None, input_type=_SIGNALREQUEST, output_type=_EVENTRESPONSE, diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 8a0d4e5..daccc61 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -48,6 +48,13 @@ SERVER_PORT = 8090 registered_clients = {} +class PortAlreadyTaken(Exception): + """ + Raised when trying to open a server in a port that is already taken. + """ + pass + + def ensure_server(port=SERVER_PORT): """ Make sure the server is running on the given port. @@ -60,18 +67,51 @@ def ensure_server(port=SERVER_PORT): :return: the daemon instance or nothing :rtype: EventsServerDaemon or None + + :raise PortAlreadyTaken: Raised if C{port} is already taken by something + that is not an events server. """ try: + # check if port is available s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('localhost', port)) s.close() - logger.info('Server is already running on port %d.', port) - return None + # port is taken, check if there's a server running there + response = ping(port) + if response is not None and response.status == proto.EventResponse.OK: + logger.info('A server is already running on port %d.', port) + return None + # port is taken, and not by an events server + logger.info('Port %d is taken by something not an events server.', port) + raise PortAlreadyTaken(port) except socket.error: + # port is available, run a server logger.info('Launching server on port %d.', port) return EventsServerDaemon.ensure(port) +def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + """ + Ping the server. + + :param port: the port in which server should be listening + :type port: int + :param reqcbk: a callback to be called when a response from server is + received + :type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + :param timeout: the timeout for synch calls + :type timeout: int + """ + request = proto.PingRequest() + service = RpcService( + proto.EventsServerService_Stub, + port, + 'localhost') + logger.info("Pinging server in port %d..." % port) + return service.ping(request, callback=reqcbk, timeout=timeout) + + class EventsServerService(proto.EventsServerService): """ Service for receiving events in clients. @@ -156,6 +196,22 @@ class EventsServerService(proto.EventsServerService): response.status = proto.EventResponse.OK done.run(response) + def ping(self, controller, request, done): + """ + Reply to a ping request. + + :param controller: used to mediate a single method call + :type controller: protobuf.socketrpc.controller.SocketRpcController + :param request: the request received from the client + :type request: leap.common.events.events_pb2.RegisterRequest + :param done: callback to be called when done + :type done: protobuf.socketrpc.server.Callback + """ + logger.info("Received ping request, sending response.") + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + class EventsServerDaemon(daemon.EventsSingletonDaemon): """ -- cgit v1.2.3